From 2e95a020abb89f1958870bd79dbca41d71adc9c2 Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Thu, 22 Dec 2022 15:51:40 +0800 Subject: [PATCH] fix dag.getPreviousNodes miss upstream node (#13255) --- .../master/runner/WorkflowExecuteRunnable.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 0bd43bdb6e..5fd97b8e07 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -153,6 +153,12 @@ public class WorkflowExecuteRunnable implements Callable { private DAG dag; + /** + * full task node map, key is task node id, value is task node + * # TODO: This field can be removed later if the dag is complete + */ + private Map taskNodesMap; + /** * unique key of workflow */ @@ -809,6 +815,8 @@ public class WorkflowExecuteRunnable implements Callable { } }); + taskNodesMap = taskNodeList.stream().collect(Collectors.toMap(TaskNode::getCode, taskNode -> taskNode)); + // generate process to get DAG info List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); @@ -1857,8 +1865,10 @@ public class WorkflowExecuteRunnable implements Callable { // init varPool only this task is the first time running if (task.isFirstRun()) { // get pre task ,get all the task varPool to this task - Set preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); - getPreVarPool(task, preTask); + // Do not use dag.getPreviousNodes because of the dag may be miss the upstream node + String preTasks = taskNodesMap.get(task.getTaskCode()).getPreTasks(); + Set preTaskList = new HashSet<>(JSONUtils.toList(preTasks, String.class)); + getPreVarPool(task, preTaskList); } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) {