From 1276e0d8e116f0fa66cebf5226ff0bb9cc77e048 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 7 Apr 2020 17:04:17 +0800 Subject: [PATCH] refactor-worker merge to dev bug fix --- .../consumer/TaskPriorityQueueConsumer.java | 44 ++++++++++++------- .../worker/runner/TaskExecuteThread.java | 7 +-- .../worker/task/sqoop/SqoopTaskTest.java | 8 ++-- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 4cc4fcf1a3..50c851c483 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; @@ -44,7 +47,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -219,13 +221,17 @@ public class TaskPriorityQueueConsumer extends Thread{ DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); - dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); - dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + if (dataSource != null){ + dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); - dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } @@ -235,19 +241,25 @@ public class TaskPriorityQueueConsumer extends Thread{ * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { - DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class); - DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); - DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); + DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - sqoopTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + if (dataSource != null){ + sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - sqoopTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 468be1c34e..8cdbf60503 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -21,8 +21,6 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.*; import java.util.stream.Collectors; -import java.util.stream.Stream; /** @@ -213,6 +210,10 @@ public class TaskExecuteThread implements Runnable { List projectRes, String tenantCode, Logger logger) throws Exception { + if (CollectionUtils.isEmpty(projectRes)){ + return; + } + for (String resource : projectRes) { File resFile = new File(execLocalPath, resource); if (!resFile.exists()) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 5694599a38..bfc8205c2d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -76,28 +76,28 @@ public class SqoopTaskTest { SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters1); + String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext()); String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; Assert.assertEquals(expected, script); String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class); - String script2 = generator.generateSqoopJob(sqoopParameters2); + String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext()); String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; Assert.assertEquals(expected2, script2); String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class); - String script3 = generator.generateSqoopJob(sqoopParameters3); + String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext()); String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; Assert.assertEquals(expected3, script3); String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class); - String script4 = generator.generateSqoopJob(sqoopParameters4); + String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext()); String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; Assert.assertEquals(expected4, script4);