From a8baa9553fd64d1414c204c6ba95019b7376f771 Mon Sep 17 00:00:00 2001 From: OS <29528966+lenboo@users.noreply.github.com> Date: Fri, 8 Oct 2021 14:08:12 +0800 Subject: [PATCH] [Bug-6455][Master]fix bug 6455: cannot stop sub-task (#6458) * fix bug: cannot stop the task. * fix bug: cannot stop the task. * remove the check thread number --- .../master/processor/StateEventProcessor.java | 6 +++++- .../master/runner/WorkflowExecuteThread.java | 2 +- .../runner/task/CommonTaskProcessor.java | 2 +- .../master/runner/task/SubTaskProcessor.java | 18 ++++++++++++++++-- .../service/process/ProcessService.java | 4 ---- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index d5a8e85b5d..2f9a634250 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -61,8 +61,12 @@ public class StateEventProcessor implements NettyRequestProcessor { StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class); StateEvent stateEvent = new StateEvent(); - stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); stateEvent.setKey(stateEventChangeCommand.getKey()); + if (stateEventChangeCommand.getSourceProcessInstanceId() != stateEventChangeCommand.getDestProcessInstanceId()) { + stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + } else { + stateEvent.setExecutionStatus(stateEventChangeCommand.getSourceStatus()); + } stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 7ed2679e72..ad444070be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -416,7 +416,7 @@ public class WorkflowExecuteThread implements Runnable { if (stateEvent.getExecutionStatus().typeIsFinished()) { endProcess(); } - if (stateEvent.getExecutionStatus() == ExecutionStatus.READY_STOP) { + if (processInstance.getState() == ExecutionStatus.READY_STOP) { killAllTasks(); } return true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 4296b85fa4..ee1c548525 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -55,7 +55,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { MasterConfig masterConfig; @Autowired - NettyExecutorManager nettyExecutorManager; + NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); /** * logger of MasterBaseTaskExecThread diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 7a4be5830c..e0cd3e8603 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.concurrent.locks.Lock; @@ -43,6 +46,8 @@ public class SubTaskProcessor extends BaseTaskProcessor { */ private final Lock runLock = new ReentrantLock(); + private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); + @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { this.processInstance = processInstance; @@ -121,8 +126,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { } subProcessInstance.setState(ExecutionStatus.READY_PAUSE); processService.updateProcessInstance(subProcessInstance); - //TODO... - // send event to sub process master + sendToSubProcess(); return true; } @@ -157,9 +161,19 @@ public class SubTaskProcessor extends BaseTaskProcessor { } subProcessInstance.setState(ExecutionStatus.READY_STOP); processService.updateProcessInstance(subProcessInstance); + sendToSubProcess(); return true; } + private void sendToSubProcess() { + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0 + ); + String address = subProcessInstance.getHost().split(":")[0]; + int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]); + this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + } + @Override public String getType() { return TaskType.SUB_PROCESS.getDesc(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6821967d3c..e78112a33b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -222,10 +222,6 @@ public class ProcessService { moveToErrorCommand(command, "process instance is null"); return null; } - if (!checkThreadNum(command, validThreadNum)) { - logger.info("there is not enough thread for this command: {}", command); - return setWaitingThreadProcess(command, processInstance); - } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance);