From 145314b782c765802463031ee3e075de857a823d Mon Sep 17 00:00:00 2001 From: Yelli Date: Wed, 25 Nov 2020 11:11:44 +0800 Subject: [PATCH] [FIX-4034][server] fix sqoop import fail (#4036) * fix #4043, sqoop import query fail * fix #4043, sqoop task hard code & code style * add license for SqoopConstants * add private constructor for SqoopConstants * fixed sqoop mysql pwd have special character * fix checkstyle * fix sqoop task log * remove unused constants --- .../dolphinscheduler/common/Constants.java | 14 +++ .../{QueryType.java => SqoopQueryType.java} | 30 +++-- .../worker/task/sqoop/SqoopConstants.java | 74 +++++++++++ .../server/worker/task/sqoop/SqoopTask.java | 49 +++++--- .../task/sqoop/generator/CommonGenerator.java | 63 +++++----- .../sqoop/generator/ISourceGenerator.java | 6 +- .../sqoop/generator/ITargetGenerator.java | 6 +- .../sqoop/generator/SqoopJobGenerator.java | 44 ++++--- .../sources/HdfsSourceGenerator.java | 38 +++--- .../sources/HiveSourceGenerator.java | 49 +++++--- .../sources/MysqlSourceGenerator.java | 114 +++++++++-------- .../targets/HdfsTargetGenerator.java | 58 +++++---- .../targets/HiveTargetGenerator.java | 70 ++++++----- .../targets/MysqlTargetGenerator.java | 90 +++++++------ .../worker/task/sqoop/SqoopTaskTest.java | 119 +++++++++++------- 15 files changed, 516 insertions(+), 308 deletions(-) rename dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/{QueryType.java => SqoopQueryType.java} (69%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 6da58f7c9a..4a696d2e37 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -221,6 +221,11 @@ public final class Constants { */ public static final String COLON = ":"; + /** + * SPACE " " + */ + public static final String SPACE = " "; + /** * SINGLE_SLASH / */ @@ -231,6 +236,15 @@ public final class Constants { */ public static final String DOUBLE_SLASH = "//"; + /** + * SINGLE_QUOTES "'" + */ + public static final String SINGLE_QUOTES = "'"; + /** + * DOUBLE_QUOTES "\"" + */ + public static final String DOUBLE_QUOTES = "\""; + /** * SEMICOLON ; */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java similarity index 69% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java index 13820b4bab..4d279f5d8e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java @@ -14,20 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.enums; -public enum QueryType { +public enum SqoopQueryType { + + FORM(0, "SQOOP_QUERY_FORM"), + SQL(1, "SQOOP_QUERY_SQL"); + + private final Integer code; + + private final String desc; - FORM, - SQL; + SqoopQueryType(Integer code, String desc) { + this.code = code; + this.desc = desc; + } + + public Integer getCode() { + return code; + } - public static QueryType getEnum(int value){ - for (QueryType e:QueryType.values()) { - if(e.ordinal() == value) { - return e; - } - } - //For values out of enum scope - return null; + public String getDesc() { + return desc; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java new file mode 100644 index 0000000000..c19a1e0fff --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task.sqoop; + +public final class SqoopConstants { + + private SqoopConstants() { + } + + //sqoop general param + public static final String SQOOP = "sqoop"; + public static final String SQOOP_MR_JOB_NAME = "mapred.job.name"; + public static final String SQOOP_PARALLELISM = "-m"; + public static final String FIELDS_TERMINATED_BY = "--fields-terminated-by"; + public static final String LINES_TERMINATED_BY = "--lines-terminated-by"; + public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'"; + + //sqoop db + public static final String DB_CONNECT = "--connect"; + public static final String DB_USERNAME = "--username"; + public static final String DB_PWD = "--password"; + public static final String TABLE = "--table"; + public static final String COLUMNS = "--columns"; + public static final String QUERY_WHERE = "where"; + public static final String QUERY = "--query"; + public static final String QUERY_CONDITION = "AND \\$CONDITIONS"; + public static final String QUERY_WITHOUT_CONDITION = "WHERE \\$CONDITIONS"; + public static final String MAP_COLUMN_HIVE = "--map-column-hive"; + public static final String MAP_COLUMN_JAVA = "--map-column-java"; + + + //sqoop hive source + public static final String HCATALOG_DATABASE = "--hcatalog-database"; + public static final String HCATALOG_TABLE = "--hcatalog-table"; + public static final String HCATALOG_PARTITION_KEYS = "--hcatalog-partition-keys"; + public static final String HCATALOG_PARTITION_VALUES = "--hcatalog-partition-values"; + + //sqoop hdfs + public static final String HDFS_EXPORT_DIR = "--export-dir"; + public static final String TARGET_DIR = "--target-dir"; + public static final String COMPRESSION_CODEC = "--compression-codec"; + + //sqoop hive + public static final String HIVE_IMPORT = "--hive-import"; + public static final String HIVE_TABLE = "--hive-table"; + public static final String CREATE_HIVE_TABLE = "--create-hive-table"; + public static final String HIVE_DROP_IMPORT_DELIMS = "--hive-drop-import-delims"; + public static final String HIVE_OVERWRITE = "--hive-overwrite"; + public static final String DELETE_TARGET_DIR = "--delete-target-dir"; + public static final String HIVE_DELIMS_REPLACEMENT = "--hive-delims-replacement"; + public static final String HIVE_PARTITION_KEY = "--hive-partition-key"; + public static final String HIVE_PARTITION_VALUE = "--hive-partition-value"; + + //sqoop update model + public static final String UPDATE_KEY = "--update-key"; + public static final String UPDATE_MODE = "--update-mode"; + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index c66fe95592..00d94f01bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -14,64 +14,73 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; -import org.slf4j.Logger; import java.util.Map; -import org.apache.dolphinscheduler.common.utils.*; + +import org.slf4j.Logger; /** * sqoop task extends the shell task */ public class SqoopTask extends AbstractYarnTask { + /** + * sqoop task params + */ private SqoopParameters sqoopParameters; /** * taskExecutionContext */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext sqoopTaskExecutionContext; - public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger){ - super(taskExecutionContext,logger); - this.taskExecutionContext = taskExecutionContext; + public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.sqoopTaskExecutionContext = taskExecutionContext; } @Override - public void init() throws Exception { - logger.info("sqoop task params {}", taskExecutionContext.getTaskParams()); + public void init() { + logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams()); sqoopParameters = - JSONUtils.parseObject(taskExecutionContext.getTaskParams(),SqoopParameters.class); - if (!sqoopParameters.checkParameters()) { - throw new RuntimeException("sqoop task params is not valid"); + JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class); + //check sqoop task params + if (null == sqoopParameters) { + throw new IllegalArgumentException("Sqoop Task params is null"); } + if (!sqoopParameters.checkParameters()) { + throw new IllegalArgumentException("Sqoop Task params check fail"); + } } @Override - protected String buildCommand() throws Exception { + protected String buildCommand() { //get sqoop scripts SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext); + String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - sqoopParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), + sqoopTaskExecutionContext.getDefinedParams(), + sqoopParameters.getLocalParametersMap(), + CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), + sqoopTaskExecutionContext.getScheduleTime()); - if(paramsMap != null){ - String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + if (paramsMap != null) { + String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); logger.info("sqoop script: {}", resultScripts); return resultScripts; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java index ffca73544d..e3e7c9a3ae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -14,71 +14,72 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * common script generator */ public class CommonGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(CommonGenerator.class); public String generate(SqoopParameters sqoopParameters) { - StringBuilder result = new StringBuilder(); - try{ - result.append("sqoop ") - .append(sqoopParameters.getModelType()); - //set sqoop job name - result.append(" -D mapred.job.name") - .append(Constants.EQUAL_SIGN) - .append(sqoopParameters.getJobName()); + StringBuilder commonSb = new StringBuilder(); + + try { + //sqoop task model + commonSb.append(SqoopConstants.SQOOP) + .append(Constants.SPACE) + .append(sqoopParameters.getModelType()); - //set hadoop custom param + //sqoop map-reduce job name + commonSb.append(Constants.SPACE).append(Constants.D).append(Constants.SPACE) + .append(String.format("%s%s%s", SqoopConstants.SQOOP_MR_JOB_NAME, + Constants.EQUAL_SIGN, sqoopParameters.getJobName())); + + //hadoop custom param List hadoopCustomParams = sqoopParameters.getHadoopCustomParams(); if (CollectionUtils.isNotEmpty(hadoopCustomParams)) { for (Property hadoopCustomParam : hadoopCustomParams) { - String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp() - + Constants.EQUAL_SIGN + hadoopCustomParam.getValue(); + String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(), + Constants.EQUAL_SIGN, hadoopCustomParam.getValue()); - if (StringUtils.isNotEmpty(hadoopCustomParamStr)) { - result.append(hadoopCustomParamStr); - } + commonSb.append(Constants.SPACE).append(Constants.D) + .append(Constants.SPACE).append(hadoopCustomParamStr); } } - //set sqoop advanced custom param + //sqoop custom params List sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams(); if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) { - for (Property sqoopAdvancedParam : sqoopAdvancedParams) { - String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp() - + " " + sqoopAdvancedParam.getValue(); - if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) { - result.append(sqoopAdvancedParamStr); - } + commonSb.append(Constants.SPACE).append(sqoopAdvancedParam.getProp()) + .append(Constants.SPACE).append(sqoopAdvancedParam.getValue()); } } - if(sqoopParameters.getConcurrency() >0){ - result.append(" -m ") - .append(sqoopParameters.getConcurrency()); + //sqoop parallelism + if (sqoopParameters.getConcurrency() > 0) { + commonSb.append(Constants.SPACE).append(SqoopConstants.SQOOP_PARALLELISM) + .append(Constants.SPACE).append(sqoopParameters.getConcurrency()); } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage())); } - return result.toString(); + return commonSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java index 841654b699..e6a9576a25 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -26,9 +27,10 @@ public interface ISourceGenerator { /** * generate the source script - * @param sqoopParameters sqoopParameters + * + * @param sqoopParameters sqoopParameters * @param taskExecutionContext taskExecutionContext * @return source script */ - String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java index 7bdaf49e83..eb355a7438 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -26,9 +27,10 @@ public interface ITargetGenerator { /** * generate the target script - * @param sqoopParameters sqoopParameters + * + * @param sqoopParameters sqoopParameters * @param taskExecutionContext taskExecutionContext * @return target script */ - String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java index 6bc94d0f35..9feaffa5c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.enums.SqoopJobType; @@ -46,37 +47,38 @@ public class SqoopJobGenerator { /** * common script generator */ - private CommonGenerator commonGenerator; + private final CommonGenerator commonGenerator; - public SqoopJobGenerator(){ + public SqoopJobGenerator() { commonGenerator = new CommonGenerator(); } - private void createSqoopJobGenerator(String sourceType,String targetType){ + private void createSqoopJobGenerator(String sourceType, String targetType) { sourceGenerator = createSourceGenerator(sourceType); targetGenerator = createTargetGenerator(targetType); } /** * get the final sqoop scripts - * @param sqoopParameters - * @return + * + * @param sqoopParameters sqoop params + * @return sqoop scripts */ - public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){ + public String generateSqoopJob(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { String sqoopScripts = ""; if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) { - createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); - if(sourceGenerator == null || targetGenerator == null){ + createSqoopJobGenerator(sqoopParameters.getSourceType(), sqoopParameters.getTargetType()); + if (sourceGenerator == null || targetGenerator == null) { throw new RuntimeException("sqoop task source type or target type is null"); } - sqoopScripts = commonGenerator.generate(sqoopParameters) - + sourceGenerator.generate(sqoopParameters,taskExecutionContext) - + targetGenerator.generate(sqoopParameters,taskExecutionContext); + sqoopScripts = String.format("%s%s%s", commonGenerator.generate(sqoopParameters), + sourceGenerator.generate(sqoopParameters, taskExecutionContext), + targetGenerator.generate(sqoopParameters, taskExecutionContext)); } else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) { - sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n"); + sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n"); } return sqoopScripts; @@ -84,11 +86,12 @@ public class SqoopJobGenerator { /** * get the source generator - * @param sourceType - * @return + * + * @param sourceType sqoop source type + * @return sqoop source generator */ - private ISourceGenerator createSourceGenerator(String sourceType){ - switch (sourceType){ + private ISourceGenerator createSourceGenerator(String sourceType) { + switch (sourceType) { case MYSQL: return new MysqlSourceGenerator(); case HIVE: @@ -102,11 +105,12 @@ public class SqoopJobGenerator { /** * get the target generator - * @param targetType - * @return + * + * @param targetType sqoop target type + * @return sqoop target generator */ - private ITargetGenerator createTargetGenerator(String targetType){ - switch (targetType){ + private ITargetGenerator createTargetGenerator(String targetType) { + switch (targetType) { case MYSQL: return new MysqlTargetGenerator(); case HIVE: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java index ded0d6db17..549d5dba63 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,28 +34,30 @@ import org.slf4j.LoggerFactory; */ public class HdfsSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hdfsSourceSb = new StringBuilder(); + + try { SourceHdfsParameter sourceHdfsParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class); - - if(sourceHdfsParameter != null){ - if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){ - result.append(" --export-dir ") - .append(sourceHdfsParameter.getExportDir()); - }else{ - throw new Exception("--export-dir is null"); + = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHdfsParameter.class); + + if (null != sourceHdfsParameter) { + if (StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())) { + hdfsSourceSb.append(Constants.SPACE).append(SqoopConstants.HDFS_EXPORT_DIR) + .append(Constants.SPACE).append(sourceHdfsParameter.getExportDir()); + } else { + throw new IllegalArgumentException("Sqoop hdfs export dir is null"); } } - }catch (Exception e){ - logger.error("get hdfs source failed",e); + } catch (Exception e) { + logger.error(String.format("Sqoop hdfs source parmas build failed: [%s]", e.getMessage())); } - return result.toString(); + return hdfsSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java index 131b616025..3229dcada7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,33 +34,40 @@ import org.slf4j.LoggerFactory; */ public class HiveSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder sb = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hiveSourceSb = new StringBuilder(); + + try { SourceHiveParameter sourceHiveParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class); - if(sourceHiveParameter != null){ - if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){ - sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase()); + = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHiveParameter.class); + + if (null != sourceHiveParameter) { + if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_DATABASE) + .append(Constants.SPACE).append(sourceHiveParameter.getHiveDatabase()); } - if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){ - sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable()); + if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_TABLE) + .append(Constants.SPACE).append(sourceHiveParameter.getHiveTable()); } - if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&& - StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){ - sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey()) - .append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue()); + if (StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey()) + && StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_KEYS) + .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionKey()) + .append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_VALUES) + .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionValue()); } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hive source params build failed: [%s]", e.getMessage())); } - return sb.toString(); + return hiveSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index 47430d1102..63738c8d04 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -14,106 +14,118 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.enums.QueryType; +import org.apache.dolphinscheduler.common.enums.SqoopQueryType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * mysql source generator */ public class MysqlSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try { - SourceMysqlParameter sourceMysqlParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class); + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder mysqlSourceSb = new StringBuilder(); + try { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); - if(sourceMysqlParameter != null){ + if (null != sourceMysqlParameter) { BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()), - sqoopTaskExecutionContext.getSourceConnectionParams()); - if(baseDataSource != null){ - result.append(" --connect ") - .append(baseDataSource.getJdbcUrl()) - .append(" --username ") - .append(baseDataSource.getUser()) - .append(" --password ") - .append(baseDataSource.getPassword()); - - if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){ - if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){ - result.append(" --table ").append(sourceMysqlParameter.getSrcTable()); + sqoopTaskExecutionContext.getSourceConnectionParams()); + + if (null != baseDataSource) { + + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT) + .append(Constants.SPACE).append(baseDataSource.getJdbcUrl()) + .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME) + .append(Constants.SPACE).append(baseDataSource.getUser()) + .append(Constants.SPACE).append(SqoopConstants.DB_PWD) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES); + + //sqoop table & sql query + if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.FORM.getCode()) { + if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.TABLE) + .append(Constants.SPACE).append(sourceMysqlParameter.getSrcTable()); } - if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){ - result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns()); + if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS) + .append(Constants.SPACE).append(sourceMysqlParameter.getSrcColumns()); } + } else if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.SQL.getCode() + && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())) { - }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal() - && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){ String srcQuery = sourceMysqlParameter.getSrcQuerySql(); - if(srcQuery.toLowerCase().contains("where")){ - srcQuery += " AND "+"$CONDITIONS"; - }else{ - srcQuery += " WHERE $CONDITIONS"; - } - result.append(" --query \'").append(srcQuery).append("\'"); + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(srcQuery); + if (srcQuery.toLowerCase().contains(SqoopConstants.QUERY_WHERE)) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_CONDITION).append(Constants.DOUBLE_QUOTES); + } else { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_WITHOUT_CONDITION).append(Constants.DOUBLE_QUOTES); + } } - List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); + //sqoop hive map column + List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); - if(mapColumnHive != null && !mapColumnHive.isEmpty()){ + if (null != mapColumnHive && !mapColumnHive.isEmpty()) { StringBuilder columnMap = new StringBuilder(); - for(Property item:mapColumnHive){ - columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); + for (Property item : mapColumnHive) { + columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA); } - if(StringUtils.isNotEmpty(columnMap.toString())){ - result.append(" --map-column-hive ") - .append(columnMap.substring(0,columnMap.length() - 1)); + if (StringUtils.isNotEmpty(columnMap.toString())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_HIVE) + .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1)); } } - List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); + //sqoop map column java + List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); - if(mapColumnJava != null && !mapColumnJava.isEmpty()){ + if (null != mapColumnJava && !mapColumnJava.isEmpty()) { StringBuilder columnMap = new StringBuilder(); - for(Property item:mapColumnJava){ - columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); + for (Property item : mapColumnJava) { + columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA); } - if(StringUtils.isNotEmpty(columnMap.toString())){ - result.append(" --map-column-java ") - .append(columnMap.substring(0,columnMap.length() - 1)); + if (StringUtils.isNotEmpty(columnMap.toString())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA) + .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1)); } } } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop task mysql source params build failed: [%s]", e.getMessage())); } - return result.toString(); + return mysqlSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java index f94d10a6d1..3ea32546ec 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,47 +34,53 @@ import org.slf4j.LoggerFactory; */ public class HdfsTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hdfsTargetSb = new StringBuilder(); + + try { TargetHdfsParameter targetHdfsParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHdfsParameter.class); - if(targetHdfsParameter != null){ + if (null != targetHdfsParameter) { - if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){ - result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR) + .append(Constants.SPACE).append(targetHdfsParameter.getTargetPath()); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){ - result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.COMPRESSION_CODEC) + .append(Constants.SPACE).append(targetHdfsParameter.getCompressionCodec()); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){ - result.append(" ").append(targetHdfsParameter.getFileType()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getFileType())) { + hdfsTargetSb.append(Constants.SPACE).append(targetHdfsParameter.getFileType()); } - if(targetHdfsParameter.isDeleteTargetDir()){ - result.append(" --delete-target-dir"); + if (targetHdfsParameter.isDeleteTargetDir()) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){ - result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY) + .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){ - result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY) + .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES); } - result.append(" --null-non-string 'NULL' --null-string 'NULL'"); + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELD_NULL_PLACEHOLDER); } - }catch(Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hdfs target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return hdfsTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java index 83c4123c45..69a95d008c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,57 +34,57 @@ import org.slf4j.LoggerFactory; */ public class HiveTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); + StringBuilder hiveTargetSb = new StringBuilder(); - try{ + try { TargetHiveParameter targetHiveParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class); - if(targetHiveParameter != null){ - - result.append(" --hive-import "); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHiveParameter.class); + if (null != targetHiveParameter) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_IMPORT); - if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&& - StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){ - result.append(" --hive-table ") - .append(targetHiveParameter.getHiveDatabase()) - .append(".") - .append(targetHiveParameter.getHiveTable()); + if (StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase()) + && StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_TABLE) + .append(Constants.SPACE).append(String.format("%s.%s", targetHiveParameter.getHiveDatabase(), + targetHiveParameter.getHiveTable())); } - if(targetHiveParameter.isCreateHiveTable()){ - result.append(" --create-hive-table"); + if (targetHiveParameter.isCreateHiveTable()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.CREATE_HIVE_TABLE); } - if(targetHiveParameter.isDropDelimiter()){ - result.append(" --hive-drop-import-delims"); + if (targetHiveParameter.isDropDelimiter()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DROP_IMPORT_DELIMS); } - if(targetHiveParameter.isHiveOverWrite()){ - result.append(" --hive-overwrite -delete-target-dir"); + if (targetHiveParameter.isHiveOverWrite()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_OVERWRITE) + .append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR); } - if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){ - result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter()); + if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT) + .append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter()); } - if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&& - StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){ - result.append(" --hive-partition-key ") - .append(targetHiveParameter.getHivePartitionKey()) - .append(" --hive-partition-value ") - .append(targetHiveParameter.getHivePartitionValue()); + if (StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey()) + && StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_KEY) + .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionKey()) + .append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_VALUE) + .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionValue()); } } - }catch(Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hive target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return hiveTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java index 406c6f661d..b8a32da6dd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -14,21 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,59 +38,74 @@ import org.slf4j.LoggerFactory; */ public class MysqlTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + StringBuilder mysqlTargetSb = new StringBuilder(); + try { TargetMysqlParameter targetMysqlParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); - if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){ + if (null != targetMysqlParameter && targetMysqlParameter.getTargetDatasource() != 0) { // get datasource BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()), - sqoopTaskExecutionContext.getTargetConnectionParams()); - - if(baseDataSource != null){ - result.append(" --connect ") - .append(baseDataSource.getJdbcUrl()) - .append(" --username ") - .append(baseDataSource.getUser()) - .append(" --password ") - .append(baseDataSource.getPassword()) - .append(" --table ") - .append(targetMysqlParameter.getTargetTable()); - - if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){ - result.append(" --columns ").append(targetMysqlParameter.getTargetColumns()); + sqoopTaskExecutionContext.getTargetConnectionParams()); + + if (null != baseDataSource) { + + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT) + .append(Constants.SPACE).append(baseDataSource.getJdbcUrl()) + .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME) + .append(Constants.SPACE).append(baseDataSource.getUser()) + .append(Constants.SPACE).append(SqoopConstants.DB_PWD) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES) + .append(Constants.SPACE).append(SqoopConstants.TABLE) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetTable()); + + if (StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetColumns()); } - if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){ - result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY); + if (targetMysqlParameter.getFieldsTerminated().contains("'")) { + mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getFieldsTerminated()); + + } else { + mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES); + } } - if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){ - result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY); + if (targetMysqlParameter.getLinesTerminated().contains(Constants.SINGLE_QUOTES)) { + mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getLinesTerminated()); + } else { + mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES); + } } - if(targetMysqlParameter.getIsUpdate() - && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) - && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){ - result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey()) - .append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode()); + if (targetMysqlParameter.getIsUpdate() + && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) + && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.UPDATE_KEY) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateKey()) + .append(Constants.SPACE).append(SqoopConstants.UPDATE_MODE) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateMode()); } } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop mysql target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return mysqlTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 2d0e39aa69..f187617d55 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -23,6 +24,9 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -33,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.Date; /** * sqoop task test @@ -43,14 +46,12 @@ public class SqoopTaskTest { private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class); - private ProcessService processService; - private ApplicationContext applicationContext; private SqoopTask sqoopTask; @Before - public void before() throws Exception{ - processService = Mockito.mock(ProcessService.class); - applicationContext = Mockito.mock(ApplicationContext.class); + public void before() { + ProcessService processService = Mockito.mock(ProcessService.class); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); @@ -61,17 +62,17 @@ public class SqoopTaskTest { taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh"); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setTaskTimeout(0); - taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," + - "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," + - "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," + - "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," + - "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" + - ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," + - "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," + - "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); - - sqoopTask = new SqoopTask(taskExecutionContext,logger); - //test sqoop tash init method + taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," + + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," + + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" + + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," + + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," + + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); + + sqoopTask = new SqoopTask(taskExecutionContext, logger); + //test sqoop task init method sqoopTask.init(); } @@ -79,40 +80,72 @@ public class SqoopTaskTest { * test SqoopJobGenerator */ @Test - public void testGenerator(){ + public void testGenerator() { TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext(); //sqoop TEMPLATE job - //import mysql to HDFS with hadoo - String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," + - "\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs,SqoopParameters.class); + //import mysql to HDFS with hadoop + String mysqlToHdfs = + "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}]," + + "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\"," + + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\"," + + "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\"," + + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs, SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); - String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext); - String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext); + String mysqlToHdfsExpected = + "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test " + + "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile " + + "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript); //export hdfs to mysql using update mode - String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + - "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + - "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql,SqoopParameters.class); - String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext); - String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\"," + + "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\"," + + "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class); + String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext); + String hdfsToMysqlScriptExpected = + "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test " + + "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' " + + "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); //export hive to mysql - String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql,SqoopParameters.class); - String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext); - String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; + String hiveToMysql = + "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\"," + + "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\"," + + "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\"," + + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql, SqoopParameters.class); + String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext); + String hiveToMysqlExpected = + "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date " + + "--hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" --table person_3 " + + "--fields-terminated-by '@' --lines-terminated-by '\\n'"; Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); //import mysql to hive - String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; - SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive,SqoopParameters.class); - String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext); - String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + String mysqlToHive = + "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\"," + + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\"," + + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[]," + + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"," + + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false," + + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class); + String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext); + String mysqlToHiveExpected = + "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" " + + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 " + + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); //sqoop CUSTOM job @@ -124,16 +157,18 @@ public class SqoopTaskTest { } - /** * get taskExecutionContext include mysql + * * @return TaskExecutionContext */ private TaskExecutionContext getMysqlTaskExecutionContext() { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); - String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; - String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + String mysqlSourceConnectionParams = + "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + String mysqlTargetConnectionParams = + "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; sqoopTaskExecutionContext.setDataSourceId(2); sqoopTaskExecutionContext.setDataTargetId(2); sqoopTaskExecutionContext.setSourcetype(0); @@ -153,7 +188,7 @@ public class SqoopTaskTest { * Method: init */ @Test - public void testInit(){ + public void testInit() { try { sqoopTask.init(); } catch (Exception e) {