From 38b643f69b65f4de9dd43809404470934bfadc7b Mon Sep 17 00:00:00 2001 From: Yann Ann Date: Wed, 19 Oct 2022 09:36:47 +0800 Subject: [PATCH] make sure all failed task will save in errorTaskMap (#12424) --- .../runner/WorkflowExecuteRunnable.java | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 962ed187d6..403135f3bd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -391,13 +391,13 @@ public class WorkflowExecuteRunnable implements Callable { retryTaskInstance(taskInstance); } else if (taskInstance.getState().isFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { - errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -421,7 +421,6 @@ public class WorkflowExecuteRunnable implements Callable { /** * release task group * - * @param taskInstance */ public void releaseTaskGroup(TaskInstance taskInstance) { logger.info("Release task group"); @@ -448,7 +447,6 @@ public class WorkflowExecuteRunnable implements Callable { /** * crate new task instance to retry, different objects from the original * - * @param taskInstance */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { if (!taskInstance.taskCanRetry()) { @@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable { } private boolean needComplementProcess() { - if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { - return true; - } - return false; + return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess(); } /** @@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * clone a new taskInstance for retry and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1097,7 +1092,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * clone a new taskInstance for tolerant and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1119,9 +1114,9 @@ public class WorkflowExecuteRunnable implements Callable { /** * new a taskInstance * - * @param processInstance - * @param taskNode - * @return + * @param processInstance process instance + * @param taskNode task node + * @return task instance */ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { TaskInstance taskInstance = new TaskInstance(); @@ -1445,9 +1440,7 @@ public class WorkflowExecuteRunnable implements Callable { long taskCode = Long.parseLong(dependNodeName); Integer taskInstanceId = completeTaskMap.get(taskCode); TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); - if (depTaskState.isFailure()) { - return false; - } + return !depTaskState.isFailure(); } return true; }