From 7720a773cfae7d7a4034036e901bbead4f6a8704 Mon Sep 17 00:00:00 2001 From: Yelli Date: Tue, 8 Dec 2020 15:09:02 +0800 Subject: [PATCH] chrry-pick sqoop task in dev to 1.3.4 (#4168) * sqoop task optimization * sqoop front-end optimization * modify sqoop task UT * add sqoop task UT to pom * sqoop task source type or target type is null throw exception * fix testSqoopTaskTest bug (#3024) * [FIX-4034][server] fix sqoop import fail (#4036) * fix #4043, sqoop import query fail * fix #4043, sqoop task hard code & code style * add license for SqoopConstants * add private constructor for SqoopConstants * fixed sqoop mysql pwd have special character * fix checkstyle * fix sqoop task log * remove unused constants * [FIX-4034][server] fix sqoop import fail (#4036) * fix #4043, sqoop import query fail * fix #4043, sqoop task hard code & code style * add license for SqoopConstants * add private constructor for SqoopConstants * fixed sqoop mysql pwd have special character * fix checkstyle * fix sqoop task log * remove unused constants * fix sqoop task jdbc string contains special char (#4105) * split sqoop import hive database and table (#4141) * modify JSONUtils Co-authored-by: CalvinKirs --- .../dolphinscheduler/api/enums/Status.java | 2 +- .../dolphinscheduler/common/Constants.java | 14 + .../common/enums/SqoopJobType.java | 41 + .../{QueryType.java => SqoopQueryType.java} | 30 +- .../common/task/sqoop/SqoopParameters.java | 99 +- .../sqoop/targets/TargetMysqlParameter.java | 2 +- .../consumer/TaskPriorityQueueConsumer.java | 33 +- .../worker/task/sqoop/SqoopConstants.java | 75 ++ .../server/worker/task/sqoop/SqoopTask.java | 50 +- .../task/sqoop/generator/CommonGenerator.java | 62 +- .../sqoop/generator/ISourceGenerator.java | 6 +- .../sqoop/generator/ITargetGenerator.java | 6 +- .../sqoop/generator/SqoopJobGenerator.java | 54 +- .../sources/HdfsSourceGenerator.java | 36 +- .../sources/HiveSourceGenerator.java | 47 +- .../sources/MysqlSourceGenerator.java | 112 +- .../targets/HdfsTargetGenerator.java | 56 +- .../targets/HiveTargetGenerator.java | 69 +- .../targets/MysqlTargetGenerator.java | 88 +- .../TaskPriorityQueueConsumerTest.java | 4 +- .../worker/task/sqoop/SqoopTaskTest.java | 180 ++-- .../formModel/tasks/_source/datasource.vue | 2 +- .../dag/_source/formModel/tasks/sqoop.vue | 962 +++++++++++------- .../src/js/module/i18n/locale/en_US.js | 7 + .../src/js/module/i18n/locale/zh_CN.js | 7 + pom.xml | 1 + 26 files changed, 1356 insertions(+), 689 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java rename dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/{QueryType.java => SqoopQueryType.java} (69%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index f5390e99e2..fd6c7af44e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -217,7 +217,7 @@ public enum Status { DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), - PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), + PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index a35bc18ca9..8af62a4822 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -215,6 +215,11 @@ public final class Constants { */ public static final String COLON = ":"; + /** + * SPACE " " + */ + public static final String SPACE = " "; + /** * SINGLE_SLASH / */ @@ -225,6 +230,15 @@ public final class Constants { */ public static final String DOUBLE_SLASH = "//"; + /** + * SINGLE_QUOTES "'" + */ + public static final String SINGLE_QUOTES = "'"; + /** + * DOUBLE_QUOTES "\"" + */ + public static final String DOUBLE_QUOTES = "\""; + /** * SEMICOLON ; */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java new file mode 100644 index 0000000000..f1fde27928 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +public enum SqoopJobType { + CUSTOM(0, "CUSTOM"), + TEMPLATE(1, "TEMPLATE"); + + SqoopJobType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java similarity index 69% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java index 13820b4bab..4d279f5d8e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java @@ -14,20 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.enums; -public enum QueryType { +public enum SqoopQueryType { + + FORM(0, "SQOOP_QUERY_FORM"), + SQL(1, "SQOOP_QUERY_SQL"); + + private final Integer code; + + private final String desc; - FORM, - SQL; + SqoopQueryType(Integer code, String desc) { + this.code = code; + this.desc = desc; + } + + public Integer getCode() { + return code; + } - public static QueryType getEnum(int value){ - for (QueryType e:QueryType.values()) { - if(e.ordinal() == value) { - return e; - } - } - //For values out of enum scope - return null; + public String getDesc() { + return desc; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java index 7f02f42387..8b566a8472 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.task.sqoop; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -28,6 +30,23 @@ import java.util.List; */ public class SqoopParameters extends AbstractParameters { + /** + * sqoop job type: + * CUSTOM - custom sqoop job + * TEMPLATE - sqoop template job + */ + private String jobType; + + /** + * customJob eq 1, use customShell + */ + private String customShell; + + /** + * sqoop job name - map-reduce job name + */ + private String jobName; + /** * model type */ @@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters { */ private String targetParams; + /** + * hadoop custom param for sqoop job + */ + private List hadoopCustomParams; + + /** + * sqoop advanced param + */ + private List sqoopAdvancedParams; + public String getModelType() { return modelType; } @@ -101,18 +130,74 @@ public class SqoopParameters extends AbstractParameters { this.targetParams = targetParams; } + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getCustomShell() { + return customShell; + } + + public void setCustomShell(String customShell) { + this.customShell = customShell; + } + + public List getHadoopCustomParams() { + return hadoopCustomParams; + } + + public void setHadoopCustomParams(List hadoopCustomParams) { + this.hadoopCustomParams = hadoopCustomParams; + } + + public List getSqoopAdvancedParams() { + return sqoopAdvancedParams; + } + + public void setSqoopAdvancedParams(List sqoopAdvancedParams) { + this.sqoopAdvancedParams = sqoopAdvancedParams; + } + @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(modelType)&& - concurrency != 0 && - StringUtils.isNotEmpty(sourceType)&& - StringUtils.isNotEmpty(targetType)&& - StringUtils.isNotEmpty(sourceParams)&& - StringUtils.isNotEmpty(targetParams); + + boolean sqoopParamsCheck = false; + + if (StringUtils.isEmpty(jobType)) { + return sqoopParamsCheck; + } + + if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isEmpty(customShell) && + StringUtils.isNotEmpty(modelType) && + StringUtils.isNotEmpty(jobName) && + concurrency != 0 && + StringUtils.isNotEmpty(sourceType) && + StringUtils.isNotEmpty(targetType) && + StringUtils.isNotEmpty(sourceParams) && + StringUtils.isNotEmpty(targetParams); + } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isNotEmpty(customShell) && + StringUtils.isEmpty(jobName); + } + + return sqoopParamsCheck; } @Override public List getResourceFilesList() { - return new ArrayList<>(); + return new ArrayList<>(); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java index 47126ae993..86931cbe46 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java @@ -106,7 +106,7 @@ public class TargetMysqlParameter { this.preQuery = preQuery; } - public boolean isUpdate() { + public boolean getIsUpdate() { return isUpdate; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 1a3bf6d02e..eef59e67c0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; @@ -275,29 +277,32 @@ public class TaskPriorityQueueConsumer extends Thread{ /** - * set datax task relation + * set sqoop task relation * @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class); - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); + // sqoop job type is template set task relation + if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); - DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); - DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); + DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - if (dataSource != null){ - sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - } + if (dataSource != null){ + sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - if (dataTarget != null){ - sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java new file mode 100644 index 0000000000..772807b4d2 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task.sqoop; + +public final class SqoopConstants { + + private SqoopConstants() { + } + + //sqoop general param + public static final String SQOOP = "sqoop"; + public static final String SQOOP_MR_JOB_NAME = "mapred.job.name"; + public static final String SQOOP_PARALLELISM = "-m"; + public static final String FIELDS_TERMINATED_BY = "--fields-terminated-by"; + public static final String LINES_TERMINATED_BY = "--lines-terminated-by"; + public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'"; + + //sqoop db + public static final String DB_CONNECT = "--connect"; + public static final String DB_USERNAME = "--username"; + public static final String DB_PWD = "--password"; + public static final String TABLE = "--table"; + public static final String COLUMNS = "--columns"; + public static final String QUERY_WHERE = "where"; + public static final String QUERY = "--query"; + public static final String QUERY_CONDITION = "AND \\$CONDITIONS"; + public static final String QUERY_WITHOUT_CONDITION = "WHERE \\$CONDITIONS"; + public static final String MAP_COLUMN_HIVE = "--map-column-hive"; + public static final String MAP_COLUMN_JAVA = "--map-column-java"; + + + //sqoop hive source + public static final String HCATALOG_DATABASE = "--hcatalog-database"; + public static final String HCATALOG_TABLE = "--hcatalog-table"; + public static final String HCATALOG_PARTITION_KEYS = "--hcatalog-partition-keys"; + public static final String HCATALOG_PARTITION_VALUES = "--hcatalog-partition-values"; + + //sqoop hdfs + public static final String HDFS_EXPORT_DIR = "--export-dir"; + public static final String TARGET_DIR = "--target-dir"; + public static final String COMPRESSION_CODEC = "--compression-codec"; + + //sqoop hive + public static final String HIVE_IMPORT = "--hive-import"; + public static final String HIVE_DATABASE = "--hive-database"; + public static final String HIVE_TABLE = "--hive-table"; + public static final String CREATE_HIVE_TABLE = "--create-hive-table"; + public static final String HIVE_DROP_IMPORT_DELIMS = "--hive-drop-import-delims"; + public static final String HIVE_OVERWRITE = "--hive-overwrite"; + public static final String DELETE_TARGET_DIR = "--delete-target-dir"; + public static final String HIVE_DELIMS_REPLACEMENT = "--hive-delims-replacement"; + public static final String HIVE_PARTITION_KEY = "--hive-partition-key"; + public static final String HIVE_PARTITION_VALUE = "--hive-partition-value"; + + //sqoop update model + public static final String UPDATE_KEY = "--update-key"; + public static final String UPDATE_MODE = "--update-mode"; + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 9f54d089be..00d94f01bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -14,63 +14,73 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop; -import com.alibaba.fastjson.JSON; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; -import org.slf4j.Logger; + import java.util.Map; +import org.slf4j.Logger; + /** * sqoop task extends the shell task */ public class SqoopTask extends AbstractYarnTask { + /** + * sqoop task params + */ private SqoopParameters sqoopParameters; /** * taskExecutionContext */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext sqoopTaskExecutionContext; - public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger){ - super(taskExecutionContext,logger); - this.taskExecutionContext = taskExecutionContext; + public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.sqoopTaskExecutionContext = taskExecutionContext; } @Override - public void init() throws Exception { - logger.info("sqoop task params {}", taskExecutionContext.getTaskParams()); + public void init() { + logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams()); sqoopParameters = - JSON.parseObject(taskExecutionContext.getTaskParams(),SqoopParameters.class); - if (!sqoopParameters.checkParameters()) { - throw new RuntimeException("sqoop task params is not valid"); + JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class); + //check sqoop task params + if (null == sqoopParameters) { + throw new IllegalArgumentException("Sqoop Task params is null"); } + if (!sqoopParameters.checkParameters()) { + throw new IllegalArgumentException("Sqoop Task params check fail"); + } } @Override - protected String buildCommand() throws Exception { + protected String buildCommand() { //get sqoop scripts SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext); + String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - sqoopParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), + sqoopTaskExecutionContext.getDefinedParams(), + sqoopParameters.getLocalParametersMap(), + CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), + sqoopTaskExecutionContext.getScheduleTime()); - if(paramsMap != null){ - String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + if (paramsMap != null) { + String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); logger.info("sqoop script: {}", resultScripts); return resultScripts; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java index 4944bac5ba..e3e7c9a3ae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -14,9 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; + +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,21 +33,53 @@ import org.slf4j.LoggerFactory; */ public class CommonGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(CommonGenerator.class); public String generate(SqoopParameters sqoopParameters) { - StringBuilder result = new StringBuilder(); - try{ - result.append("sqoop ") - .append(sqoopParameters.getModelType()); - if(sqoopParameters.getConcurrency() >0){ - result.append(" -m ") - .append(sqoopParameters.getConcurrency()); + + StringBuilder commonSb = new StringBuilder(); + + try { + //sqoop task model + commonSb.append(SqoopConstants.SQOOP) + .append(Constants.SPACE) + .append(sqoopParameters.getModelType()); + + //sqoop map-reduce job name + commonSb.append(Constants.SPACE).append(Constants.D).append(Constants.SPACE) + .append(String.format("%s%s%s", SqoopConstants.SQOOP_MR_JOB_NAME, + Constants.EQUAL_SIGN, sqoopParameters.getJobName())); + + //hadoop custom param + List hadoopCustomParams = sqoopParameters.getHadoopCustomParams(); + if (CollectionUtils.isNotEmpty(hadoopCustomParams)) { + for (Property hadoopCustomParam : hadoopCustomParams) { + String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(), + Constants.EQUAL_SIGN, hadoopCustomParam.getValue()); + + commonSb.append(Constants.SPACE).append(Constants.D) + .append(Constants.SPACE).append(hadoopCustomParamStr); + } + } + + //sqoop custom params + List sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams(); + if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) { + for (Property sqoopAdvancedParam : sqoopAdvancedParams) { + commonSb.append(Constants.SPACE).append(sqoopAdvancedParam.getProp()) + .append(Constants.SPACE).append(sqoopAdvancedParam.getValue()); + } + } + + //sqoop parallelism + if (sqoopParameters.getConcurrency() > 0) { + commonSb.append(Constants.SPACE).append(SqoopConstants.SQOOP_PARALLELISM) + .append(Constants.SPACE).append(sqoopParameters.getConcurrency()); } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage())); } - return result.toString(); + return commonSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java index 841654b699..e6a9576a25 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -26,9 +27,10 @@ public interface ISourceGenerator { /** * generate the source script - * @param sqoopParameters sqoopParameters + * + * @param sqoopParameters sqoopParameters * @param taskExecutionContext taskExecutionContext * @return source script */ - String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java index 7bdaf49e83..eb355a7438 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -26,9 +27,10 @@ public interface ITargetGenerator { /** * generate the target script - * @param sqoopParameters sqoopParameters + * + * @param sqoopParameters sqoopParameters * @param taskExecutionContext taskExecutionContext * @return target script */ - String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java index 4e9cb84ff3..9feaffa5c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator; @@ -45,40 +47,51 @@ public class SqoopJobGenerator { /** * common script generator */ - private CommonGenerator commonGenerator; + private final CommonGenerator commonGenerator; - public SqoopJobGenerator(){ + public SqoopJobGenerator() { commonGenerator = new CommonGenerator(); } - private void createSqoopJobGenerator(String sourceType,String targetType){ + private void createSqoopJobGenerator(String sourceType, String targetType) { sourceGenerator = createSourceGenerator(sourceType); targetGenerator = createTargetGenerator(targetType); } /** * get the final sqoop scripts - * @param sqoopParameters - * @return + * + * @param sqoopParameters sqoop params + * @return sqoop scripts */ - public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){ - createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); - if(sourceGenerator == null || targetGenerator == null){ - return null; + public String generateSqoopJob(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + String sqoopScripts = ""; + + if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) { + createSqoopJobGenerator(sqoopParameters.getSourceType(), sqoopParameters.getTargetType()); + if (sourceGenerator == null || targetGenerator == null) { + throw new RuntimeException("sqoop task source type or target type is null"); + } + + sqoopScripts = String.format("%s%s%s", commonGenerator.generate(sqoopParameters), + sourceGenerator.generate(sqoopParameters, taskExecutionContext), + targetGenerator.generate(sqoopParameters, taskExecutionContext)); + } else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) { + sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n"); } - return commonGenerator.generate(sqoopParameters) - + sourceGenerator.generate(sqoopParameters,taskExecutionContext) - + targetGenerator.generate(sqoopParameters,taskExecutionContext); + return sqoopScripts; } /** * get the source generator - * @param sourceType - * @return + * + * @param sourceType sqoop source type + * @return sqoop source generator */ - private ISourceGenerator createSourceGenerator(String sourceType){ - switch (sourceType){ + private ISourceGenerator createSourceGenerator(String sourceType) { + switch (sourceType) { case MYSQL: return new MysqlSourceGenerator(); case HIVE: @@ -92,11 +105,12 @@ public class SqoopJobGenerator { /** * get the target generator - * @param targetType - * @return + * + * @param targetType sqoop target type + * @return sqoop target generator */ - private ITargetGenerator createTargetGenerator(String targetType){ - switch (targetType){ + private ITargetGenerator createTargetGenerator(String targetType) { + switch (targetType) { case MYSQL: return new MysqlTargetGenerator(); case HIVE: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java index 41e56682ae..549d5dba63 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,28 +34,30 @@ import org.slf4j.LoggerFactory; */ public class HdfsSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hdfsSourceSb = new StringBuilder(); + + try { SourceHdfsParameter sourceHdfsParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class); - - if(sourceHdfsParameter != null){ - if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){ - result.append(" --export-dir ") - .append(sourceHdfsParameter.getExportDir()); - }else{ - throw new Exception("--export-dir is null"); + = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHdfsParameter.class); + + if (null != sourceHdfsParameter) { + if (StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())) { + hdfsSourceSb.append(Constants.SPACE).append(SqoopConstants.HDFS_EXPORT_DIR) + .append(Constants.SPACE).append(sourceHdfsParameter.getExportDir()); + } else { + throw new IllegalArgumentException("Sqoop hdfs export dir is null"); } } - }catch (Exception e){ - logger.error("get hdfs source failed",e); + } catch (Exception e) { + logger.error(String.format("Sqoop hdfs source parmas build failed: [%s]", e.getMessage())); } - return result.toString(); + return hdfsSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java index ea12616825..3229dcada7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,33 +34,40 @@ import org.slf4j.LoggerFactory; */ public class HiveSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder sb = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hiveSourceSb = new StringBuilder(); + + try { SourceHiveParameter sourceHiveParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class); - if(sourceHiveParameter != null){ - if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){ - sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase()); + = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHiveParameter.class); + + if (null != sourceHiveParameter) { + if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_DATABASE) + .append(Constants.SPACE).append(sourceHiveParameter.getHiveDatabase()); } - if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){ - sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable()); + if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_TABLE) + .append(Constants.SPACE).append(sourceHiveParameter.getHiveTable()); } - if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&& - StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){ - sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey()) - .append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue()); + if (StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey()) + && StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())) { + hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_KEYS) + .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionKey()) + .append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_VALUES) + .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionValue()); } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hive source params build failed: [%s]", e.getMessage())); } - return sb.toString(); + return hiveSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index 8cd5357bf3..58e508ac6f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -14,106 +14,118 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.enums.QueryType; +import org.apache.dolphinscheduler.common.enums.SqoopQueryType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * mysql source generator */ public class MysqlSourceGenerator implements ISourceGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try { - SourceMysqlParameter sourceMysqlParameter - = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class); + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder mysqlSourceSb = new StringBuilder(); + try { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); - if(sourceMysqlParameter != null){ + if (null != sourceMysqlParameter) { BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()), - sqoopTaskExecutionContext.getSourceConnectionParams()); - if(baseDataSource != null){ - result.append(" --connect ") - .append(baseDataSource.getJdbcUrl()) - .append(" --username ") - .append(baseDataSource.getUser()) - .append(" --password ") - .append(baseDataSource.getPassword()); - - if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){ - if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){ - result.append(" --table ").append(sourceMysqlParameter.getSrcTable()); + sqoopTaskExecutionContext.getSourceConnectionParams()); + + if (null != baseDataSource) { + + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES) + .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME) + .append(Constants.SPACE).append(baseDataSource.getUser()) + .append(Constants.SPACE).append(SqoopConstants.DB_PWD) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES); + + //sqoop table & sql query + if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.FORM.getCode()) { + if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.TABLE) + .append(Constants.SPACE).append(sourceMysqlParameter.getSrcTable()); } - if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){ - result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns()); + if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS) + .append(Constants.SPACE).append(sourceMysqlParameter.getSrcColumns()); } + } else if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.SQL.getCode() + && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())) { - }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal() - && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){ String srcQuery = sourceMysqlParameter.getSrcQuerySql(); - if(srcQuery.toLowerCase().contains("where")){ - srcQuery += " AND "+"$CONDITIONS"; - }else{ - srcQuery += " WHERE $CONDITIONS"; - } - result.append(" --query \'").append(srcQuery).append("\'"); + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(srcQuery); + if (srcQuery.toLowerCase().contains(SqoopConstants.QUERY_WHERE)) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_CONDITION).append(Constants.DOUBLE_QUOTES); + } else { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_WITHOUT_CONDITION).append(Constants.DOUBLE_QUOTES); + } } - List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); + //sqoop hive map column + List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); - if(mapColumnHive != null && !mapColumnHive.isEmpty()){ + if (null != mapColumnHive && !mapColumnHive.isEmpty()) { StringBuilder columnMap = new StringBuilder(); - for(Property item:mapColumnHive){ - columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); + for (Property item : mapColumnHive) { + columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA); } - if(StringUtils.isNotEmpty(columnMap.toString())){ - result.append(" --map-column-hive ") - .append(columnMap.substring(0,columnMap.length() - 1)); + if (StringUtils.isNotEmpty(columnMap.toString())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_HIVE) + .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1)); } } - List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); + //sqoop map column java + List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); - if(mapColumnJava != null && !mapColumnJava.isEmpty()){ + if (null != mapColumnJava && !mapColumnJava.isEmpty()) { StringBuilder columnMap = new StringBuilder(); - for(Property item:mapColumnJava){ - columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); + for (Property item : mapColumnJava) { + columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA); } - if(StringUtils.isNotEmpty(columnMap.toString())){ - result.append(" --map-column-java ") - .append(columnMap.substring(0,columnMap.length() - 1)); + if (StringUtils.isNotEmpty(columnMap.toString())) { + mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA) + .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1)); } } } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop task mysql source params build failed: [%s]", e.getMessage())); } - return result.toString(); + return mysqlSourceSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java index 64ea75e742..3ea32546ec 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,47 +34,53 @@ import org.slf4j.LoggerFactory; */ public class HdfsTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { + + StringBuilder hdfsTargetSb = new StringBuilder(); + + try { TargetHdfsParameter targetHdfsParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHdfsParameter.class); - if(targetHdfsParameter != null){ + if (null != targetHdfsParameter) { - if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){ - result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR) + .append(Constants.SPACE).append(targetHdfsParameter.getTargetPath()); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){ - result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.COMPRESSION_CODEC) + .append(Constants.SPACE).append(targetHdfsParameter.getCompressionCodec()); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){ - result.append(" ").append(targetHdfsParameter.getFileType()); + if (StringUtils.isNotEmpty(targetHdfsParameter.getFileType())) { + hdfsTargetSb.append(Constants.SPACE).append(targetHdfsParameter.getFileType()); } - if(targetHdfsParameter.isDeleteTargetDir()){ - result.append(" --delete-target-dir"); + if (targetHdfsParameter.isDeleteTargetDir()) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){ - result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY) + .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES); } - if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){ - result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())) { + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY) + .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES); } - result.append(" --null-non-string 'NULL' --null-string 'NULL'"); + hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELD_NULL_PLACEHOLDER); } - }catch(Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hdfs target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return hdfsTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java index dc5440b529..769fc62f25 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java @@ -14,14 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,57 +34,58 @@ import org.slf4j.LoggerFactory; */ public class HiveTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); + StringBuilder hiveTargetSb = new StringBuilder(); - try{ + try { TargetHiveParameter targetHiveParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class); - if(targetHiveParameter != null){ - - result.append(" --hive-import "); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHiveParameter.class); + if (null != targetHiveParameter) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_IMPORT); - if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&& - StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){ - result.append(" --hive-table ") - .append(targetHiveParameter.getHiveDatabase()) - .append(".") - .append(targetHiveParameter.getHiveTable()); + if (StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase()) + && StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DATABASE) + .append(Constants.SPACE).append(targetHiveParameter.getHiveDatabase()) + .append(Constants.SPACE).append(SqoopConstants.HIVE_TABLE) + .append(Constants.SPACE).append(targetHiveParameter.getHiveTable()); } - if(targetHiveParameter.isCreateHiveTable()){ - result.append(" --create-hive-table"); + if (targetHiveParameter.isCreateHiveTable()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.CREATE_HIVE_TABLE); } - if(targetHiveParameter.isDropDelimiter()){ - result.append(" --hive-drop-import-delims"); + if (targetHiveParameter.isDropDelimiter()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DROP_IMPORT_DELIMS); } - if(targetHiveParameter.isHiveOverWrite()){ - result.append(" --hive-overwrite -delete-target-dir"); + if (targetHiveParameter.isHiveOverWrite()) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_OVERWRITE) + .append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR); } - if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){ - result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter()); + if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT) + .append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter()); } - if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&& - StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){ - result.append(" --hive-partition-key ") - .append(targetHiveParameter.getHivePartitionKey()) - .append(" --hive-partition-value ") - .append(targetHiveParameter.getHivePartitionValue()); + if (StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey()) + && StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())) { + hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_KEY) + .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionKey()) + .append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_VALUE) + .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionValue()); } } - }catch(Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop hive target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return hiveTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java index aed8b9e24a..4384c96479 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -14,21 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,59 +38,74 @@ import org.slf4j.LoggerFactory; */ public class MysqlTargetGenerator implements ITargetGenerator { - private Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) { - StringBuilder result = new StringBuilder(); - try{ + StringBuilder mysqlTargetSb = new StringBuilder(); + try { TargetMysqlParameter targetMysqlParameter = - JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class); + JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); - if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){ + if (null != targetMysqlParameter && targetMysqlParameter.getTargetDatasource() != 0) { // get datasource BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()), - sqoopTaskExecutionContext.getTargetConnectionParams()); - - if(baseDataSource != null){ - result.append(" --connect ") - .append(baseDataSource.getJdbcUrl()) - .append(" --username ") - .append(baseDataSource.getUser()) - .append(" --password ") - .append(baseDataSource.getPassword()) - .append(" --table ") - .append(targetMysqlParameter.getTargetTable()); - - if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){ - result.append(" --columns ").append(targetMysqlParameter.getTargetColumns()); + sqoopTaskExecutionContext.getTargetConnectionParams()); + + if (null != baseDataSource) { + + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES) + .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME) + .append(Constants.SPACE).append(baseDataSource.getUser()) + .append(Constants.SPACE).append(SqoopConstants.DB_PWD) + .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES) + .append(Constants.SPACE).append(SqoopConstants.TABLE) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetTable()); + + if (StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetColumns()); } - if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){ - result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY); + if (targetMysqlParameter.getFieldsTerminated().contains("'")) { + mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getFieldsTerminated()); + + } else { + mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES); + } } - if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){ - result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'"); + if (StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY); + if (targetMysqlParameter.getLinesTerminated().contains(Constants.SINGLE_QUOTES)) { + mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getLinesTerminated()); + } else { + mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES); + } } - if(targetMysqlParameter.isUpdate() - && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) - && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){ - result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey()) - .append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode()); + if (targetMysqlParameter.getIsUpdate() + && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) + && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())) { + mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.UPDATE_KEY) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateKey()) + .append(Constants.SPACE).append(SqoopConstants.UPDATE_MODE) + .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateMode()); } } } - }catch (Exception e){ - logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(String.format("Sqoop mysql target params build failed: [%s]", e.getMessage())); } - return result.toString(); + return mysqlTargetSb.toString(); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index b247dc41e6..a95acf831e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Before; @@ -233,8 +232,7 @@ public class TaskPriorityQueueConsumerTest { dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); - - Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); Thread.sleep(10000); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index bfc8205c2d..01dee2b001 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -14,17 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.sqoop; -import com.alibaba.fastjson.JSON; -import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; -import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,8 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.*; - /** * sqoop task test */ @@ -45,71 +45,137 @@ public class SqoopTaskTest { private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class); - private ProcessService processService; - private ApplicationContext applicationContext; private SqoopTask sqoopTask; @Before - public void before() throws Exception{ - processService = Mockito.mock(ProcessService.class); - Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); - applicationContext = Mockito.mock(ApplicationContext.class); + public void before() { + ProcessService processService = Mockito.mock(ProcessService.class); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); - - sqoopTask = new SqoopTask(new TaskExecutionContext(),logger); + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis())); + taskExecutionContext.setTenantCode("1"); + taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(0); + taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," + + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," + + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" + + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," + + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," + + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); + + sqoopTask = new SqoopTask(taskExecutionContext, logger); + //test sqoop task init method sqoopTask.init(); } + /** + * test SqoopJobGenerator + */ @Test - public void testGenerator(){ - String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class); - + public void testGenerator() { + TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext(); + + //sqoop TEMPLATE job + //import mysql to HDFS with hadoop + String mysqlToHdfs = + "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}]," + + "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\"," + + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\"," + + "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\"," + + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs, SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext()); - String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; - Assert.assertEquals(expected, script); - - String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class); - - String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext()); - String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; - Assert.assertEquals(expected2, script2); - - String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class); - - String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext()); - String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; - Assert.assertEquals(expected3, script3); - - String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class); - - String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext()); - String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; - Assert.assertEquals(expected4, script4); + String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext); + String mysqlToHdfsExpected = + "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" " + + "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile " + + "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript); + + //export hdfs to mysql using update mode + String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\"," + + "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\"," + + "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class); + String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext); + String hdfsToMysqlScriptExpected = + "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect \"jdbc:mysql://192.168.0.111:3306/test\" " + + "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' " + + "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); + + //export hive to mysql + String hiveToMysql = + "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\"," + + "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\"," + + "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\"," + + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql, SqoopParameters.class); + String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext); + String hiveToMysqlExpected = + "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date " + + "--hcatalog-partition-values 2020-02-17 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" --table person_3 " + + "--fields-terminated-by '@' --lines-terminated-by '\\n'"; + Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); + + //import mysql to hive + String mysqlToHive = + "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\"," + + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\"," + + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[]," + + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"," + + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false," + + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class); + String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext); + String mysqlToHiveExpected = + "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" " + + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 " + + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); + + //sqoop CUSTOM job + String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}"; + SqoopParameters sqoopCustomParams = JSONUtils.parseObject(sqoopCustomString, SqoopParameters.class); + String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext()); + String sqoopCustomExpected = "sqoop import"; + Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript); } - private DataSource getDataSource() { - DataSource dataSource = new DataSource(); - dataSource.setType(DbType.MYSQL); - dataSource.setConnectionParams( - "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"); - dataSource.setUserId(1); - return dataSource; + /** + * get taskExecutionContext include mysql + * + * @return TaskExecutionContext + */ + private TaskExecutionContext getMysqlTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); + String mysqlSourceConnectionParams = + "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + String mysqlTargetConnectionParams = + "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + sqoopTaskExecutionContext.setDataSourceId(2); + sqoopTaskExecutionContext.setDataTargetId(2); + sqoopTaskExecutionContext.setSourcetype(0); + sqoopTaskExecutionContext.setTargetConnectionParams(mysqlTargetConnectionParams); + sqoopTaskExecutionContext.setSourceConnectionParams(mysqlSourceConnectionParams); + sqoopTaskExecutionContext.setTargetType(0); + taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext); + return taskExecutionContext; } @Test @@ -121,7 +187,7 @@ public class SqoopTaskTest { * Method: init */ @Test - public void testInit(){ + public void testInit() { try { sqoopTask.init(); } catch (Exception e) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue index a173139d15..05e248f518 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue @@ -131,7 +131,7 @@ }, created () { let supportType = this.supportType || [] - this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS) + this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS) // Have a specified data source if (supportType.length) { let is = (type) => { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue index 2ff3dbf6fd..ce1a77082e 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue @@ -1,134 +1,194 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/