diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index ae7300f54d..7ed6addcb5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -218,7 +218,7 @@ public enum Status { DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), - PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), + PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java new file mode 100644 index 0000000000..f1fde27928 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +public enum SqoopJobType { + CUSTOM(0, "CUSTOM"), + TEMPLATE(1, "TEMPLATE"); + + SqoopJobType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java index 7f02f42387..8b566a8472 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.task.sqoop; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -28,6 +30,23 @@ import java.util.List; */ public class SqoopParameters extends AbstractParameters { + /** + * sqoop job type: + * CUSTOM - custom sqoop job + * TEMPLATE - sqoop template job + */ + private String jobType; + + /** + * customJob eq 1, use customShell + */ + private String customShell; + + /** + * sqoop job name - map-reduce job name + */ + private String jobName; + /** * model type */ @@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters { */ private String targetParams; + /** + * hadoop custom param for sqoop job + */ + private List hadoopCustomParams; + + /** + * sqoop advanced param + */ + private List sqoopAdvancedParams; + public String getModelType() { return modelType; } @@ -101,18 +130,74 @@ public class SqoopParameters extends AbstractParameters { this.targetParams = targetParams; } + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getCustomShell() { + return customShell; + } + + public void setCustomShell(String customShell) { + this.customShell = customShell; + } + + public List getHadoopCustomParams() { + return hadoopCustomParams; + } + + public void setHadoopCustomParams(List hadoopCustomParams) { + this.hadoopCustomParams = hadoopCustomParams; + } + + public List getSqoopAdvancedParams() { + return sqoopAdvancedParams; + } + + public void setSqoopAdvancedParams(List sqoopAdvancedParams) { + this.sqoopAdvancedParams = sqoopAdvancedParams; + } + @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(modelType)&& - concurrency != 0 && - StringUtils.isNotEmpty(sourceType)&& - StringUtils.isNotEmpty(targetType)&& - StringUtils.isNotEmpty(sourceParams)&& - StringUtils.isNotEmpty(targetParams); + + boolean sqoopParamsCheck = false; + + if (StringUtils.isEmpty(jobType)) { + return sqoopParamsCheck; + } + + if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isEmpty(customShell) && + StringUtils.isNotEmpty(modelType) && + StringUtils.isNotEmpty(jobName) && + concurrency != 0 && + StringUtils.isNotEmpty(sourceType) && + StringUtils.isNotEmpty(targetType) && + StringUtils.isNotEmpty(sourceParams) && + StringUtils.isNotEmpty(targetParams); + } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isNotEmpty(customShell) && + StringUtils.isEmpty(jobName); + } + + return sqoopParamsCheck; } @Override public List getResourceFilesList() { - return new ArrayList<>(); + return new ArrayList<>(); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java index 47126ae993..86931cbe46 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java @@ -106,7 +106,7 @@ public class TargetMysqlParameter { this.preQuery = preQuery; } - public boolean isUpdate() { + public boolean getIsUpdate() { return isUpdate; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 3314789fdb..da2363fc81 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -258,29 +259,32 @@ public class TaskPriorityQueueConsumer extends Thread{ /** - * set datax task relation + * set sqoop task relation * @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class); - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); + // sqoop job type is template set task relation + if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); - DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); - DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); + DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - if (dataSource != null){ - sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - } + if (dataSource != null){ + sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - if (dataTarget != null){ - sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java index 4944bac5ba..ffca73544d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -16,10 +16,17 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + + /** * common script generator */ @@ -32,6 +39,38 @@ public class CommonGenerator { try{ result.append("sqoop ") .append(sqoopParameters.getModelType()); + + //set sqoop job name + result.append(" -D mapred.job.name") + .append(Constants.EQUAL_SIGN) + .append(sqoopParameters.getJobName()); + + //set hadoop custom param + List hadoopCustomParams = sqoopParameters.getHadoopCustomParams(); + if (CollectionUtils.isNotEmpty(hadoopCustomParams)) { + for (Property hadoopCustomParam : hadoopCustomParams) { + String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp() + + Constants.EQUAL_SIGN + hadoopCustomParam.getValue(); + + if (StringUtils.isNotEmpty(hadoopCustomParamStr)) { + result.append(hadoopCustomParamStr); + } + } + } + + //set sqoop advanced custom param + List sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams(); + if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) { + + for (Property sqoopAdvancedParam : sqoopAdvancedParams) { + String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp() + + " " + sqoopAdvancedParam.getValue(); + if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) { + result.append(sqoopAdvancedParamStr); + } + } + } + if(sqoopParameters.getConcurrency() >0){ result.append(" -m ") .append(sqoopParameters.getConcurrency()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java index 4e9cb84ff3..6bc94d0f35 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator; @@ -62,14 +63,23 @@ public class SqoopJobGenerator { * @return */ public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){ - createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); - if(sourceGenerator == null || targetGenerator == null){ - return null; + + String sqoopScripts = ""; + + if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) { + createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); + if(sourceGenerator == null || targetGenerator == null){ + throw new RuntimeException("sqoop task source type or target type is null"); + } + + sqoopScripts = commonGenerator.generate(sqoopParameters) + + sourceGenerator.generate(sqoopParameters,taskExecutionContext) + + targetGenerator.generate(sqoopParameters,taskExecutionContext); + } else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) { + sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n"); } - return commonGenerator.generate(sqoopParameters) - + sourceGenerator.generate(sqoopParameters,taskExecutionContext) - + targetGenerator.generate(sqoopParameters,taskExecutionContext); + return sqoopScripts; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index f8e3d57c7d..402f4f8ee0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -77,19 +77,19 @@ public class MysqlSourceGenerator implements ISourceGenerator { }else{ srcQuery += " WHERE $CONDITIONS"; } - result.append(" --query \'"+srcQuery+"\'"); + result.append(" --query \'").append(srcQuery).append("\'"); } List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); if(mapColumnHive != null && !mapColumnHive.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnHive){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-hive ") .append(columnMap.substring(0,columnMap.length()-1)); } @@ -98,12 +98,12 @@ public class MysqlSourceGenerator implements ISourceGenerator { List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); if(mapColumnJava != null && !mapColumnJava.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnJava){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-java ") .append(columnMap.substring(0,columnMap.length()-1)); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java index aed8b9e24a..5b0b75d8e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -78,7 +78,7 @@ public class MysqlTargetGenerator implements ITargetGenerator { result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'"); } - if(targetMysqlParameter.isUpdate() + if(targetMysqlParameter.getIsUpdate() && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){ result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey()) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index bfc8205c2d..0f18b4e2ac 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -17,11 +17,9 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop; import com.alibaba.fastjson.JSON; -import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; -import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -35,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.*; +import java.util.Date; /** * sqoop task test @@ -52,64 +50,98 @@ public class SqoopTaskTest { @Before public void before() throws Exception{ processService = Mockito.mock(ProcessService.class); - Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); applicationContext = Mockito.mock(ApplicationContext.class); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); - - sqoopTask = new SqoopTask(new TaskExecutionContext(),logger); + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis())); + taskExecutionContext.setTenantCode("1"); + taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(0); + taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," + + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," + + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" + + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," + + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," + + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); + + sqoopTask = new SqoopTask(taskExecutionContext,logger); + //test sqoop tash init method sqoopTask.init(); } + /** + * test SqoopJobGenerator + */ @Test public void testGenerator(){ - String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class); + TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext(); + //sqoop TEMPLATE job + //import mysql to HDFS with hadoo + String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," + + "\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHdfsParams = JSON.parseObject(mysqlToHdfs,SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext()); - String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; - Assert.assertEquals(expected, script); - - String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class); - - String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext()); - String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; - Assert.assertEquals(expected2, script2); - - String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class); - - String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext()); - String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; - Assert.assertEquals(expected3, script3); - - String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class); - - String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext()); - String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; - Assert.assertEquals(expected4, script4); + String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext); + String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript); + + //export hdfs to mysql using update mode + String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hdfsToMysqlParams = JSON.parseObject(hdfsToMysql,SqoopParameters.class); + String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext); + String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); + + //export hive to mysql + String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hiveToMysqlParams = JSON.parseObject(hiveToMysql,SqoopParameters.class); + String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext); + String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; + Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); + + //import mysql to hive + String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHiveParams = JSON.parseObject(mysqlToHive,SqoopParameters.class); + String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext); + String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); + + //sqoop CUSTOM job + String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}"; + SqoopParameters sqoopCustomParams = JSON.parseObject(sqoopCustomString, SqoopParameters.class); + String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext()); + String sqoopCustomExpected = "sqoop import"; + Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript); } - private DataSource getDataSource() { - DataSource dataSource = new DataSource(); - dataSource.setType(DbType.MYSQL); - dataSource.setConnectionParams( - "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"); - dataSource.setUserId(1); - return dataSource; + + /** + * get taskExecutionContext include mysql + * @return TaskExecutionContext + */ + private TaskExecutionContext getMysqlTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); + String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + sqoopTaskExecutionContext.setDataSourceId(2); + sqoopTaskExecutionContext.setDataTargetId(2); + sqoopTaskExecutionContext.setSourcetype(0); + sqoopTaskExecutionContext.setTargetConnectionParams(mysqlTargetConnectionParams); + sqoopTaskExecutionContext.setSourceConnectionParams(mysqlSourceConnectionParams); + sqoopTaskExecutionContext.setTargetType(0); + taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext); + return taskExecutionContext; } @Test diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue index a173139d15..05e248f518 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue @@ -131,7 +131,7 @@ }, created () { let supportType = this.supportType || [] - this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS) + this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS) // Have a specified data source if (supportType.length) { let is = (type) => { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue index 6594ffcf74..4bed92d27a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue @@ -1,134 +1,194 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/