diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index e51f9f9967..229d9088cc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -121,11 +121,17 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onSuccess(Object result) { - if (workflowExecuteThread.workFlowFinish()) { - stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); - processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); - notifyProcessChanged(workflowExecuteThread.getProcessInstance()); - logger.info("process instance {} finished.", processInstanceId); + // if an exception occurs, first, the error message cannot be printed in the log; + // secondly, the `multiThreadFilterMap` cannot be remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak + try { + if (workflowExecuteThread.workFlowFinish()) { + stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); + notifyProcessChanged(workflowExecuteThread.getProcessInstance()); + logger.info("process instance {} finished.", processInstanceId); + } + } catch (Exception e) { + logger.error("handle events {} success, but notify changed error", processInstanceId, e); } multiThreadFilterMap.remove(workflowExecuteThread.getKey()); }