Browse Source

[fix-#14537] the branch that needs to be executed overlaps with another branch, it may not be able to complete the normal execution (#14563)

3.2.1-prepare
fuchanghai 1 year ago committed by GitHub
parent
commit
b391b74df4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
  2. 49
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

49
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java

@ -414,7 +414,7 @@ public class DagHelper {
return conditionTaskList; return conditionTaskList;
} }
private static List<Long> skipTaskNode4Switch(TaskNode taskNode, public static List<Long> skipTaskNode4Switch(TaskNode taskNode,
Map<Long, TaskNode> skipTaskNodeList, Map<Long, TaskNode> skipTaskNodeList,
Map<Long, TaskInstance> completeTaskList, Map<Long, TaskInstance> completeTaskList,
DAG<Long, TaskNode, TaskNodeRelation> dag) { DAG<Long, TaskNode, TaskNodeRelation> dag) {
@ -424,19 +424,62 @@ public class DagHelper {
int resultConditionLocation = switchParameters.getResultConditionLocation(); int resultConditionLocation = switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList(); List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
List<Long> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); List<Long> switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode();
Set<Long> switchNeedWorkCodes = new HashSet<>();
if (CollectionUtils.isEmpty(switchTaskList)) { if (CollectionUtils.isEmpty(switchTaskList)) {
switchTaskList = new ArrayList<>(); return new ArrayList<>();
}
// get all downstream nodes of the branch that the switch node needs to execute
for (Long switchTaskCode : switchTaskList) {
getSwitchNeedWorkCodes(switchTaskCode, dag, switchNeedWorkCodes);
} }
// conditionResultVoList.remove(resultConditionLocation); // conditionResultVoList.remove(resultConditionLocation);
for (SwitchResultVo info : conditionResultVoList) { for (SwitchResultVo info : conditionResultVoList) {
if (CollectionUtils.isEmpty(info.getNextNode())) { if (CollectionUtils.isEmpty(info.getNextNode())) {
continue; continue;
} }
setTaskNodeSkip(info.getNextNode().get(0), dag, completeTaskList, skipTaskNodeList); for (Long nextNode : info.getNextNode()) {
setSwitchTaskNodeSkip(nextNode, dag, completeTaskList, skipTaskNodeList,
switchNeedWorkCodes);
}
} }
return switchTaskList; return switchTaskList;
} }
/**
* get all downstream nodes of the branch that the switch node needs to execute
* @param taskCode
* @param dag
* @param switchNeedWorkCodes
*/
public static void getSwitchNeedWorkCodes(Long taskCode, DAG<Long, TaskNode, TaskNodeRelation> dag,
Set<Long> switchNeedWorkCodes) {
switchNeedWorkCodes.add(taskCode);
Set<Long> subsequentNodes = dag.getSubsequentNodes(taskCode);
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(subsequentNodes)) {
for (Long subCode : subsequentNodes) {
getSwitchNeedWorkCodes(subCode, dag, switchNeedWorkCodes);
}
}
}
private static void setSwitchTaskNodeSkip(Long skipNodeCode,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskInstance> completeTaskList,
Map<Long, TaskNode> skipTaskNodeList,
Set<Long> switchNeedWorkCodes) {
// ignore when the node that needs to be skipped exists on the branch that the switch type node needs to execute
if (!dag.containsNode(skipNodeCode) || switchNeedWorkCodes.contains(skipNodeCode)) {
return;
}
skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
Collection<Long> postNodeList = dag.getSubsequentNodes(skipNodeCode);
for (Long post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
}
}
}
/** /**
* set task node and the post nodes skip flag * set task node and the post nodes skip flag
*/ */

49
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

@ -330,6 +330,24 @@ public class DagHelperTest {
Assertions.assertEquals(1, postNodes.size()); Assertions.assertEquals(1, postNodes.size());
} }
@Test
public void testSwitchPostNode() throws IOException {
DAG<Long, TaskNode, TaskNodeRelation> dag = generateDag2();
Map<Long, TaskNode> skipTaskNodeList = new HashMap<>();
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
completeTaskList.put(0l, new TaskInstance());
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setTaskCode(1l);
Map<String, Object> taskParamsMap = new HashMap<>();
taskParamsMap.put(Constants.SWITCH_RESULT, "");
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
taskInstance.setSwitchDependency(getSwitchNode());
completeTaskList.put(1l, taskInstance);
DagHelper.skipTaskNode4Switch(dag.getNode(1l), skipTaskNodeList, completeTaskList, dag);
Assertions.assertNotNull(skipTaskNodeList.get(2L));
Assertions.assertEquals(1, skipTaskNodeList.size());
}
/** /**
* process: * process:
* 1->2->3->5->7 * 1->2->3->5->7
@ -436,11 +454,13 @@ public class DagHelperTest {
/** /**
* DAG graph: * DAG graph:
* 2 * -> 2->
* * / \
* 0->1(switch) * / \
* * 0->1(switch)->5 6
* 4 * \ /
* \ /
* -> 4->
* *
* @return dag * @return dag
* @throws JsonProcessingException if error throws JsonProcessingException * @throws JsonProcessingException if error throws JsonProcessingException
@ -484,15 +504,26 @@ public class DagHelperTest {
taskNodeList.add(node4); taskNodeList.add(node4);
TaskNode node5 = new TaskNode(); TaskNode node5 = new TaskNode();
node5.setId("4"); node5.setId("5");
node5.setName("4"); node5.setName("5");
node5.setCode(4); node5.setCode(5);
node5.setType("SHELL"); node5.setType("SHELL");
List<Long> dep5 = new ArrayList<>(); List<Long> dep5 = new ArrayList<>();
dep5.add(1L); dep5.add(1L);
node5.setPreTasks(JSONUtils.toJsonString(dep5)); node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5); taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node5.setId("6");
node5.setName("6");
node5.setCode(6);
node5.setType("SHELL");
List<Long> dep6 = new ArrayList<>();
dep5.add(2L);
dep5.add(4L);
node5.setPreTasks(JSONUtils.toJsonString(dep6));
taskNodeList.add(node6);
List<Long> startNodes = new ArrayList<>(); List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>(); List<Long> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@ -518,7 +549,7 @@ public class DagHelperTest {
conditionsParameters.setDependTaskList(list); conditionsParameters.setDependTaskList(list);
conditionsParameters.setNextNode(5L); conditionsParameters.setNextNode(5L);
conditionsParameters.setRelation("AND"); conditionsParameters.setRelation("AND");
conditionsParameters.setResultConditionLocation(1);
// in: AND(AND(1 is SUCCESS)) // in: AND(AND(1 is SUCCESS))
return conditionsParameters; return conditionsParameters;
} }

Loading…
Cancel
Save