From 7bcec7115aa5967bed9208ee24a567899fb4f84d Mon Sep 17 00:00:00 2001 From: WangJPLeo <103574007+WangJPLeo@users.noreply.github.com> Date: Mon, 25 Apr 2022 15:29:18 +0800 Subject: [PATCH] [Fix-9717] The failure policy of the task flow takes effect (#9718) * Failure policy takes effect. * Coverage on New Code * correct description logic * Compatible with all scenarios * clearer logic Co-authored-by: WangJPLeo --- .../dolphinscheduler/dao/utils/DagHelper.java | 39 ++++++++------ .../dao/utils/DagHelperTest.java | 52 +++++++++++++++++++ .../master/runner/WorkflowExecuteThread.java | 6 +-- 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 0531791423..a3bd022192 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -534,19 +536,7 @@ public class DagHelper { public static boolean haveConditionsAfterNode(String parentNodeCode, DAG dag ) { - boolean result = false; - Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); - if (CollectionUtils.isEmpty(subsequentNodes)) { - return result; - } - for (String nodeCode : subsequentNodes) { - TaskNode taskNode = dag.getNode(nodeCode); - List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); - if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) { - return true; - } - } - return result; + return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS); } /** @@ -565,19 +555,38 @@ public class DagHelper { return false; } + /** * is there have blocking node after the parent node */ public static boolean haveBlockingAfterNode(String parentNodeCode, DAG dag) { + return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING); + } + + /** + * is there have all node after the parent node + */ + public static boolean haveAllNodeAfterNode(String parentNodeCode, + DAG dag) { + return haveSubAfterNode(parentNodeCode, dag, null); + } + + /** + * Whether there is a specified type of child node after the parent node + */ + public static boolean haveSubAfterNode(String parentNodeCode, + DAG dag, String filterNodeType) { Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); if (CollectionUtils.isEmpty(subsequentNodes)) { return false; } + if (StringUtils.isBlank(filterNodeType)){ + return true; + } for (String nodeName : subsequentNodes) { TaskNode taskNode = dag.getNode(nodeName); - List preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class); - if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) { + if (taskNode.getType().equalsIgnoreCase(filterNodeType)){ return true; } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 1d374d9951..e0dafeb5bf 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -48,6 +49,57 @@ import com.fasterxml.jackson.core.JsonProcessingException; * dag helper test */ public class DagHelperTest { + + @Test + public void testHaveSubAfterNode(){ + String parentNodeCode = "5293789969856"; + List taskNodeRelations = new ArrayList<>(); + TaskNodeRelation relation = new TaskNodeRelation(); + relation.setStartNode("5293789969856"); + relation.setEndNode("5293789969857"); + taskNodeRelations.add(relation); + + TaskNodeRelation relationNext = new TaskNodeRelation(); + relationNext.setStartNode("5293789969856"); + relationNext.setEndNode("5293789969858"); + taskNodeRelations.add(relationNext); + + List taskNodes = new ArrayList<>(); + TaskNode node = new TaskNode(); + node.setCode(5293789969856L); + node.setType("SHELL"); + + TaskNode subNode = new TaskNode(); + subNode.setCode(5293789969857L); + subNode.setType("BLOCKING"); + subNode.setPreTasks("[5293789969856]"); + + TaskNode subNextNode = new TaskNode(); + subNextNode.setCode(5293789969858L); + subNextNode.setType("CONDITIONS"); + subNextNode.setPreTasks("[5293789969856]"); + + taskNodes.add(node); + taskNodes.add(subNode); + taskNodes.add(subNextNode); + + ProcessDag processDag = new ProcessDag(); + processDag.setEdges(taskNodeRelations); + processDag.setNodes(taskNodes); + DAG dag = DagHelper.buildDagGraph(processDag); + boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag); + Assert.assertTrue(canSubmit); + + boolean haveBlocking = DagHelper.haveBlockingAfterNode(parentNodeCode, dag); + Assert.assertTrue(haveBlocking); + + boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag); + Assert.assertTrue(haveConditions); + + boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_DEPENDENT); + Assert.assertFalse(dependent); + } + /** * test task node can submit * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 979de3feb4..4d1cb296a7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -456,9 +456,9 @@ public class WorkflowExecuteThread { retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - if (taskInstance.isConditionsTask() - || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag) - || DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { + // There are child nodes and the failure policy is: CONTINUE + if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag) + && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());