From b391b74df4baf7b6f90df79df584e391ee799279 Mon Sep 17 00:00:00 2001 From: fuchanghai <33984497+fuchanghai@users.noreply.github.com> Date: Tue, 1 Aug 2023 11:04:31 +0800 Subject: [PATCH] [fix-#14537] the branch that needs to be executed overlaps with another branch, it may not be able to complete the normal execution (#14563) --- .../service/utils/DagHelper.java | 55 +++++++++++++++++-- .../service/utils/DagHelperTest.java | 49 ++++++++++++++--- 2 files changed, 89 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java index b966d93997..ee5e97cf55 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java @@ -414,29 +414,72 @@ public class DagHelper { return conditionTaskList; } - private static List skipTaskNode4Switch(TaskNode taskNode, - Map skipTaskNodeList, - Map completeTaskList, - DAG dag) { + public static List skipTaskNode4Switch(TaskNode taskNode, + Map skipTaskNodeList, + Map completeTaskList, + DAG dag) { SwitchParameters switchParameters = completeTaskList.get(taskNode.getCode()).getSwitchDependency(); int resultConditionLocation = switchParameters.getResultConditionLocation(); List conditionResultVoList = switchParameters.getDependTaskList(); List switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); + Set switchNeedWorkCodes = new HashSet<>(); if (CollectionUtils.isEmpty(switchTaskList)) { - switchTaskList = new ArrayList<>(); + return new ArrayList<>(); + } + // get all downstream nodes of the branch that the switch node needs to execute + for (Long switchTaskCode : switchTaskList) { + getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes); } // conditionResultVoList.remove(resultConditionLocation); for (SwitchResultVo info : conditionResultVoList) { if (CollectionUtils.isEmpty(info.getNextNode())) { continue; } - setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList); + for (Long nextNode : info.getNextNode()) { + setSwitchTaskNodeSkip(nextNode, dag, completeTaskList, skipTaskNodeList, + switchNeedWorkCodes); + } } return switchTaskList; } + /** + * get all downstream nodes of the branch that the switch node needs to execute + * @param taskCode + * @param dag + * @param switchNeedWorkCodes + */ + public static void getSwitchNeedWorkCodes(Long taskCode, DAG dag, + Set switchNeedWorkCodes) { + switchNeedWorkCodes.add(taskCode); + Set subsequentNodes = dag.getSubsequentNodes(taskCode); + if (org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) { + for (Long subCode : subsequentNodes) { + getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes); + } + } + } + + private static void setSwitchTaskNodeSkip(Long skipNodeCode, + DAG dag, + Map completeTaskList, + Map skipTaskNodeList, + Set switchNeedWorkCodes) { + // ignore when the node that needs to be skipped exists on the branch that the switch type node needs to execute + if (!dag.containsNode(skipNodeCode) || switchNeedWorkCodes.contains(skipNodeCode)) { + return; + } + skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode)); + Collection postNodeList = dag.getSubsequentNodes(skipNodeCode); + for (Long post : postNodeList) { + TaskNode postNode = dag.getNode(post); + if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) { + setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList); + } + } + } /** * set task node and the post nodes skip flag */ diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java index 85c9296248..c19812303c 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java @@ -330,6 +330,24 @@ public class DagHelperTest { Assertions.assertEquals(1, postNodes.size()); } + @Test + public void testSwitchPostNode() throws IOException { + DAG dag = generateDag2(); + Map skipTaskNodeList = new HashMap<>(); + Map completeTaskList = new HashMap<>(); + completeTaskList.put(0l, new TaskInstance()); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.SUCCESS); + taskInstance.setTaskCode(1l); + Map taskParamsMap = new HashMap<>(); + taskParamsMap.put(Constants.SWITCH_RESULT, ""); + taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); + taskInstance.setSwitchDependency(getSwitchNode()); + completeTaskList.put(1l, taskInstance); + DagHelper.skipTaskNode4Switch(dag.getNode(1l), skipTaskNodeList, completeTaskList, dag); + Assertions.assertNotNull(skipTaskNodeList.get(2L)); + Assertions.assertEquals(1, skipTaskNodeList.size()); + } /** * process: * 1->2->3->5->7 @@ -436,11 +454,13 @@ public class DagHelperTest { /** * DAG graph: - * 2 - * ↑ - * 0->1(switch) - * ↓ - * 4 + * -> 2-> + * / \ + * / \ + * 0->1(switch)->5 6 + * \ / + * \ / + * -> 4-> * * @return dag * @throws JsonProcessingException if error throws JsonProcessingException @@ -484,15 +504,26 @@ public class DagHelperTest { taskNodeList.add(node4); TaskNode node5 = new TaskNode(); - node5.setId("4"); - node5.setName("4"); - node5.setCode(4); + node5.setId("5"); + node5.setName("5"); + node5.setCode(5); node5.setType("SHELL"); List dep5 = new ArrayList<>(); dep5.add(1L); node5.setPreTasks(JSONUtils.toJsonString(dep5)); taskNodeList.add(node5); + TaskNode node6 = new TaskNode(); + node5.setId("6"); + node5.setName("6"); + node5.setCode(6); + node5.setType("SHELL"); + List dep6 = new ArrayList<>(); + dep5.add(2L); + dep5.add(4L); + node5.setPreTasks(JSONUtils.toJsonString(dep6)); + taskNodeList.add(node6); + List startNodes = new ArrayList<>(); List recoveryNodes = new ArrayList<>(); List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, @@ -518,7 +549,7 @@ public class DagHelperTest { conditionsParameters.setDependTaskList(list); conditionsParameters.setNextNode(5L); conditionsParameters.setRelation("AND"); - + conditionsParameters.setResultConditionLocation(1); // in: AND(AND(1 is SUCCESS)) return conditionsParameters; }