From 2ec668df9be372e639fe40747897fa1654283dc4 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 7 Apr 2020 15:23:36 +0800 Subject: [PATCH] refactor-worker merge to dev bug fix --- .../builder/TaskExecutionContextBuilder.java | 20 +++++++++----- .../consumer/TaskPriorityQueueConsumer.java | 27 +++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 0809f31129..535c274989 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -18,10 +18,7 @@ package org.apache.dolphinscheduler.server.builder; import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.*; /** * TaskExecutionContext builder @@ -111,14 +108,25 @@ public class TaskExecutionContextBuilder { /** * build procedureTask related info * - * @param procedureTaskExecutionContext - * @return + * @param procedureTaskExecutionContext procedureTaskExecutionContext + * @return TaskExecutionContextBuilder */ public TaskExecutionContextBuilder buildProcedureTaskRelatedInfo(ProcedureTaskExecutionContext procedureTaskExecutionContext){ taskExecutionContext.setProcedureTaskExecutionContext(procedureTaskExecutionContext); return this; } + /** + * build sqoopTask related info + * + * @param sqoopTaskExecutionContext sqoopTaskExecutionContext + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildSqoopTaskRelatedInfo(SqoopTaskExecutionContext sqoopTaskExecutionContext){ + taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext); + return this; + } + /** * create diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 3092766915..735795f463 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -159,6 +159,7 @@ public class TaskPriorityQueueConsumer extends Thread{ SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); + SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); // SQL task @@ -178,6 +179,10 @@ public class TaskPriorityQueueConsumer extends Thread{ setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); } + if (taskType == TaskType.SQOOP){ + setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode); + } + return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) @@ -222,6 +227,28 @@ public class TaskPriorityQueueConsumer extends Thread{ dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } + + /** + * set datax task relation + * @param sqoopTaskExecutionContext sqoopTaskExecutionContext + * @param taskNode taskNode + */ + private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { + DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + + DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + + + sqoopTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + + sqoopTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } + /** * set SQL task relation * @param sqlTaskExecutionContext sqlTaskExecutionContext