From 00cf2598405c8d3193af332c981b609e81428b0d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 24 Feb 2020 10:00:04 +0800 Subject: [PATCH] buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify --- .../server/worker/runner/TaskScheduleThread.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index b288aeace3..735e4ba563 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -97,7 +97,7 @@ public class TaskScheduleThread implements Runnable { try { // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType()); + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext); taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); logger.info("script path : {}", taskExecutionContext.getExecutePath()); @@ -182,17 +182,20 @@ public class TaskScheduleThread implements Runnable { } return globalParamsMap; } + /** - * build ack command - * @param taskType taskType + * build ack command + * @param taskExecutionContext taskExecutionContext + * @return ExecuteTaskAckCommand */ - private ExecuteTaskAckCommand buildAckCommand(String taskType) { + private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); + ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setLogPath(getTaskLogPath()); ackCommand.setHost(OSUtils.getHost()); ackCommand.setStartTime(new Date()); - if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ + if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ ackCommand.setExecutePath(taskExecutionContext.getExecutePath());