From 71b11e2c08b7813d3b14c4dc7e6724ff36692934 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Fri, 13 Mar 2020 15:28:22 +0800 Subject: [PATCH] DataxTask process test modify (#2162) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify Co-authored-by: qiaozhanwei --- dolphinscheduler-server/pom.xml | 4 ---- .../master/consumer/TaskUpdateQueueConsumer.java | 13 +++++++++++++ .../server/worker/task/datax/DataxTask.java | 3 +-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 080b87ebaa..8dfee12127 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -36,10 +36,6 @@ org.apache.dolphinscheduler dolphinscheduler-common - - protobuf-java - com.google.protobuf - io.netty netty diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index b54f39f32d..8b00133b0f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -178,7 +179,19 @@ public class TaskUpdateQueueConsumer extends Thread{ // DATAX task if (taskType == TaskType.DATAX){ + DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + + + dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + + dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 1eee6f2d47..391f522363 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -216,8 +216,7 @@ public class DataxTask extends AbstractTask { * @return collection of datax job config JSONObject * @throws SQLException if error throws SQLException */ - private List buildDataxJobContentJson() - throws SQLException { + private List buildDataxJobContentJson() throws SQLException { DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();