Browse Source

fix dag.getPreviousNodes miss upstream node (#13255)

3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
2e95a020ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -153,6 +153,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private DAG<String, TaskNode, TaskNodeRelation> dag; private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
* full task node map, key is task node id, value is task node
* # TODO: This field can be removed later if the dag is complete
*/
private Map<Long, TaskNode> taskNodesMap;
/** /**
* unique key of workflow * unique key of workflow
*/ */
@ -809,6 +815,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
}); });
taskNodesMap = taskNodeList.stream().collect(Collectors.toMap(TaskNode::getCode, taskNode -> taskNode));
// generate process to get DAG info // generate process to get DAG info
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
@ -1857,8 +1865,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// init varPool only this task is the first time running // init varPool only this task is the first time running
if (task.isFirstRun()) { if (task.isFirstRun()) {
// get pre task ,get all the task varPool to this task // get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); // Do not use dag.getPreviousNodes because of the dag may be miss the upstream node
getPreVarPool(task, preTask); String preTasks = taskNodesMap.get(task.getTaskCode()).getPreTasks();
Set<String> preTaskList = new HashSet<>(JSONUtils.toList(preTasks, String.class));
getPreVarPool(task, preTaskList);
} }
DependResult dependResult = getDependResultForTask(task); DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) { if (DependResult.SUCCESS == dependResult) {

Loading…
Cancel
Save