diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 080b87ebaa..8dfee12127 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -36,10 +36,6 @@ org.apache.dolphinscheduler dolphinscheduler-common - - protobuf-java - com.google.protobuf - io.netty netty diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index b54f39f32d..8b00133b0f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -178,7 +179,19 @@ public class TaskUpdateQueueConsumer extends Thread{ // DATAX task if (taskType == TaskType.DATAX){ + DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + + + dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + + dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 1eee6f2d47..391f522363 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -216,8 +216,7 @@ public class DataxTask extends AbstractTask { * @return collection of datax job config JSONObject * @throws SQLException if error throws SQLException */ - private List buildDataxJobContentJson() - throws SQLException { + private List buildDataxJobContentJson() throws SQLException { DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();