Browse Source

fix workflow keep running when task fail (#11930)

3.2.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
6868876a29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/** /**
* Workflow execute task, used to execute a workflow instance. * Workflow execute task, used to execute a workflow instance.
@ -180,9 +181,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>(); private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/** /**
* depend failed task map, taskCode as key, taskId as value * depend failed task set
*/ */
private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>(); private final Set<Long> dependFailedTaskSet = Sets.newConcurrentHashSet();
/** /**
* forbidden task map, code as key * forbidden task map, code as key
@ -805,7 +806,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskFailedSubmit = false; taskFailedSubmit = false;
activeTaskProcessorMaps.clear(); activeTaskProcessorMaps.clear();
dependFailedTaskMap.clear(); dependFailedTaskSet.clear();
completeTaskMap.clear(); completeTaskMap.clear();
errorTaskMap.clear(); errorTaskMap.clear();
@ -908,8 +909,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
} }
} }
logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}", logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
dependFailedTaskMap, dependFailedTaskSet,
completeTaskMap, completeTaskMap,
errorTaskMap); errorTaskMap);
} }
@ -1494,7 +1495,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (this.errorTaskMap.size() > 0) { if (this.errorTaskMap.size() > 0) {
return true; return true;
} }
return this.dependFailedTaskMap.size() > 0; return this.dependFailedTaskSet.size() > 0;
} }
/** /**
@ -1845,7 +1846,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
} else if (DependResult.FAILED == dependResult) { } else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure. // if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId()); dependFailedTaskSet.add(task.getTaskCode());
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(),
dependResult); dependResult);

Loading…
Cancel
Save