From 6868876a29c3099a3ea82ce1b65e82c9302fdd8d Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Thu, 15 Sep 2022 09:06:10 +0800 Subject: [PATCH] fix workflow keep running when task fail (#11930) --- .../master/runner/WorkflowExecuteRunnable.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 8e4e64528e..67ac86808c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * Workflow execute task, used to execute a workflow instance. @@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable { private final Map completeTaskMap = new ConcurrentHashMap<>(); /** - * depend failed task map, taskCode as key, taskId as value + * depend failed task set */ - private final Map dependFailedTaskMap = new ConcurrentHashMap<>(); + private final Set dependFailedTaskSet = Sets.newConcurrentHashSet(); /** * forbidden task map, code as key @@ -805,7 +806,7 @@ public class WorkflowExecuteRunnable implements Callable { taskFailedSubmit = false; activeTaskProcessorMaps.clear(); - dependFailedTaskMap.clear(); + dependFailedTaskSet.clear(); completeTaskMap.clear(); errorTaskMap.clear(); @@ -908,8 +909,8 @@ public class WorkflowExecuteRunnable implements Callable { } } } - logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}", - dependFailedTaskMap, + logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}", + dependFailedTaskSet, completeTaskMap, errorTaskMap); } @@ -1494,7 +1495,7 @@ public class WorkflowExecuteRunnable implements Callable { if (this.errorTaskMap.size() > 0) { return true; } - return this.dependFailedTaskMap.size() > 0; + return this.dependFailedTaskSet.size() > 0; } /** @@ -1845,7 +1846,7 @@ public class WorkflowExecuteRunnable implements Callable { } } else if (DependResult.FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTaskMap.put(task.getTaskCode(), task.getId()); + dependFailedTaskSet.add(task.getTaskCode()); removeTaskFromStandbyList(task); logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);