From 36284f9b9cd9bc4fabbfc05891aefe7e96546a42 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 1 Feb 2023 15:31:04 +0800 Subject: [PATCH] cherry-pick Fix task wake up failed will block the event processing --- .../common/enums/StateEventType.java | 2 +- .../master/event/TaskWaitTaskGroupStateHandler.java | 12 +++++++++--- .../server/master/processor/TaskEventProcessor.java | 2 +- .../master/runner/WorkflowExecuteRunnable.java | 2 +- .../master/runner/WorkflowExecuteThreadPool.java | 2 +- .../service/process/ProcessServiceImpl.java | 1 + 6 files changed, 14 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index bc021e5e08..5afadaaf06 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -25,7 +25,7 @@ public enum StateEventType { TASK_STATE_CHANGE(1, "task state change"), PROCESS_TIMEOUT(2, "process timeout"), TASK_TIMEOUT(3, "task timeout"), - WAIT_TASK_GROUP(4, "wait task group"), + WAKE_UP_TASK_GROUP(4, "wait task group"), TASK_RETRY(5, "task retry"), PROCESS_BLOCKED(6, "process blocked"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java index 8dcca91d89..da8b564ed8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java @@ -30,13 +30,19 @@ public class TaskWaitTaskGroupStateHandler implements StateEventHandler { private static final Logger logger = LoggerFactory.getLogger(TaskWaitTaskGroupStateHandler.class); @Override - public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) { + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, + StateEvent stateEvent) { logger.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent); + if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) { + logger.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); + } else { + logger.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); + } + return true; } @Override public StateEventType getEventType() { - return StateEventType.WAIT_TASK_GROUP; + return StateEventType.WAKE_UP_TASK_GROUP; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java index ce31b1a018..4362850845 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -56,7 +56,7 @@ public class TaskEventProcessor implements NettyRequestProcessor { .processInstanceId(taskEventChangeCommand.getProcessInstanceId()) .taskInstanceId(taskEventChangeCommand.getTaskInstanceId()) .key(taskEventChangeCommand.getKey()) - .type(StateEventType.WAIT_TASK_GROUP) + .type(StateEventType.WAKE_UP_TASK_GROUP) .build(); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 939f4baca0..4e3bd4d575 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -443,7 +443,7 @@ public class WorkflowExecuteRunnable implements Callable { TaskStateEvent nextEvent = TaskStateEvent.builder() .processInstanceId(processInstance.getId()) .taskInstanceId(nextTaskInstance.getId()) - .type(StateEventType.WAIT_TASK_GROUP) + .type(StateEventType.WAKE_UP_TASK_GROUP) .build(); this.stateEvents.add(nextEvent); } else { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 388637a567..e26df60244 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { return; } if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { - logger.warn("The workflow has been executed by another thread"); + logger.debug("The workflow has been executed by another thread"); return; } multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 73378a9163..98cfe5e39e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -3135,6 +3135,7 @@ public class ProcessServiceImpl implements ProcessService { processInstance.getId(), taskId); Host host = new Host(processInstance.getHost()); stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType)); + logger.info("Success send command to master: {}, command: {}", host, taskEventChangeCommand); } @Override