From 3e5781b6e1c38675ba134048b21941be63723cb6 Mon Sep 17 00:00:00 2001 From: guoshupei <15764973965@163.com> Date: Fri, 1 Apr 2022 22:23:35 +0800 Subject: [PATCH] [Fix-9065] [master] when task submit failed, remove from queue and add state event to thread (#9186) * [Fix-9065] [master] when task submit failed, remove from queue and add state event to thread This closes #9065 * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java yes, your idea is better Co-authored-by: caishunfeng * [Bug-9065] [master] update logger info Co-authored-by: guoshupei Co-authored-by: caishunfeng --- .../server/master/runner/WorkflowExecuteThread.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 1d3413b8c7..9c1aa90c79 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -1637,7 +1637,9 @@ public class WorkflowExecuteThread { stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); - this.processStateChangeHandler(stateEvent); +// this.processStateChangeHandler(stateEvent); + // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks + this.stateEvents.add(stateEvent); } } @@ -1800,6 +1802,11 @@ public class WorkflowExecuteThread { TaskInstance taskInstance = submitTaskExec(task); if (taskInstance == null) { this.taskFailedSubmit = true; + // Remove and add to complete map and error map + removeTaskFromStandbyList(task); + completeTaskMap.put(task.getTaskCode(), task.getId()); + errorTaskMap.put(task.getTaskCode(), task.getId()); + logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode()); } else { removeTaskFromStandbyList(task); }