From 561e5b2e2a6d5c45992f957278dbc8654f937944 Mon Sep 17 00:00:00 2001 From: liliang1991 Date: Fri, 20 Nov 2020 16:10:02 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=8D=95=E7=8B=AC?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=AD=90=E8=8A=82=E7=82=B9=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/apache/dolphinscheduler/dao/utils/DagHelper.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index bd5d888a69..7d622b33fc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -276,6 +276,9 @@ public class DagHelper { } for (String dependNodeName : dependList) { TaskNode dependNode = dag.getNode(dependNodeName); + if(dependNode==null){ + continue; + } if (completeTaskList.containsKey(dependNodeName) || dependNode.isForbidden() || skipTaskNodeList.containsKey(dependNodeName)) { From 78972144e0cbca48e82acac1bf4be16e0206a551 Mon Sep 17 00:00:00 2001 From: liliang1991 Date: Sat, 21 Nov 2020 16:28:53 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=8D=95=E7=8B=AC?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=AD=90=E8=8A=82=E7=82=B9=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E7=9A=84=E9=97=AE=E9=A2=98(=E5=A2=9E=E5=8A=A0checksty?= =?UTF-8?q?le)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dolphinscheduler/dao/utils/DagHelper.java | 44 ++++++------------- 1 file changed, 13 insertions(+), 31 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 7d622b33fc..47d131784b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -70,10 +70,10 @@ public class DagHelper { /** * generate task nodes needed by dag * - * @param taskNodeList taskNodeList - * @param startNodeNameList startNodeNameList + * @param taskNodeList taskNodeList + * @param startNodeNameList startNodeNameList * @param recoveryNodeNameList recoveryNodeNameList - * @param taskDependType taskDependType + * @param taskDependType taskDependType * @return task node list */ public static List generateFlowNodeListByStartNode(List taskNodeList, List startNodeNameList, @@ -131,7 +131,7 @@ public class DagHelper { /** * find all the nodes that depended on the start node * - * @param startNode startNode + * @param startNode startNode * @param taskNodeList taskNodeList * @return task node list */ @@ -156,9 +156,9 @@ public class DagHelper { /** * find all nodes that start nodes depend on. * - * @param startNode startNode + * @param startNode startNode * @param recoveryNodeNameList recoveryNodeNameList - * @param taskNodeList taskNodeList + * @param taskNodeList taskNodeList * @return task node list */ private static List getFlowNodeListPre(TaskNode startNode, List recoveryNodeNameList, List taskNodeList, List visitedNodeNameList) { @@ -192,9 +192,9 @@ public class DagHelper { * generate dag by start nodes and recovery nodes * * @param processDefinitionJson processDefinitionJson - * @param startNodeNameList startNodeNameList - * @param recoveryNodeNameList recoveryNodeNameList - * @param depNodeType depNodeType + * @param startNodeNameList startNodeNameList + * @param recoveryNodeNameList recoveryNodeNameList + * @param depNodeType depNodeType * @return process dag * @throws Exception if error throws Exception */ @@ -246,7 +246,7 @@ public class DagHelper { * find node by node name * * @param nodeDetails nodeDetails - * @param nodeName nodeName + * @param nodeName nodeName * @return task node */ public static TaskNode findNodeByName(List nodeDetails, String nodeName) { @@ -261,8 +261,8 @@ public class DagHelper { /** * the task can be submit when all the depends nodes are forbidden or complete * - * @param taskNode taskNode - * @param dag dag + * @param taskNode taskNode + * @param dag dag * @param completeTaskList completeTaskList * @return can submit */ @@ -276,7 +276,7 @@ public class DagHelper { } for (String dependNodeName : dependList) { TaskNode dependNode = dag.getNode(dependNodeName); - if(dependNode==null){ + if (dependNode == null) { continue; } if (completeTaskList.containsKey(dependNodeName) @@ -295,7 +295,6 @@ public class DagHelper { * this function parse the condition node to find the right branch. * also check all the depends nodes forbidden or complete * - * @param preNodeName * @return successor nodes */ public static Set parsePostNodes(String preNodeName, @@ -332,9 +331,6 @@ public class DagHelper { /** * if all of the task dependence are skipped, skip it too. - * - * @param taskNode - * @return */ private static boolean isTaskNodeNeedSkip(TaskNode taskNode, Map skipTaskNodeList @@ -354,9 +350,6 @@ public class DagHelper { /** * parse condition task find the branch process * set skip flag for another one. - * - * @param nodeName - * @return */ public static List parseConditionTask(String nodeName, Map skipTaskNodeList, @@ -391,11 +384,6 @@ public class DagHelper { /** * set task node and the post nodes skip flag - * - * @param skipNodeName - * @param dag - * @param completeTaskList - * @param skipTaskNodeList */ private static void setTaskNodeSkip(String skipNodeName, DAG dag, @@ -467,9 +455,6 @@ public class DagHelper { /** * is there have conditions after the parent node - * - * @param parentNodeName - * @return */ public static boolean haveConditionsAfterNode(String parentNodeName, DAG dag @@ -491,9 +476,6 @@ public class DagHelper { /** * is there have conditions after the parent node - * - * @param parentNodeName - * @return */ public static boolean haveConditionsAfterNode(String parentNodeName, List taskNodes From 86567e041cdbbbcbc62fd473c7e4f31832736746 Mon Sep 17 00:00:00 2001 From: "liang.li.c" Date: Mon, 23 Nov 2020 16:50:14 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=8D=95=E7=8B=AC?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=AD=90=E8=8A=82=E7=82=B9=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E7=9A=84=E9=97=AE=E9=A2=98(=E5=A2=9E=E5=8A=A0checksty?= =?UTF-8?q?le)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dolphinscheduler/dao/utils/DagHelper.java | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 47d131784b..9a772322e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -70,10 +70,10 @@ public class DagHelper { /** * generate task nodes needed by dag * - * @param taskNodeList taskNodeList - * @param startNodeNameList startNodeNameList + * @param taskNodeList taskNodeList + * @param startNodeNameList startNodeNameList * @param recoveryNodeNameList recoveryNodeNameList - * @param taskDependType taskDependType + * @param taskDependType taskDependType * @return task node list */ public static List generateFlowNodeListByStartNode(List taskNodeList, List startNodeNameList, @@ -131,7 +131,7 @@ public class DagHelper { /** * find all the nodes that depended on the start node * - * @param startNode startNode + * @param startNode startNode * @param taskNodeList taskNodeList * @return task node list */ @@ -156,9 +156,9 @@ public class DagHelper { /** * find all nodes that start nodes depend on. * - * @param startNode startNode + * @param startNode startNode * @param recoveryNodeNameList recoveryNodeNameList - * @param taskNodeList taskNodeList + * @param taskNodeList taskNodeList * @return task node list */ private static List getFlowNodeListPre(TaskNode startNode, List recoveryNodeNameList, List taskNodeList, List visitedNodeNameList) { @@ -192,9 +192,9 @@ public class DagHelper { * generate dag by start nodes and recovery nodes * * @param processDefinitionJson processDefinitionJson - * @param startNodeNameList startNodeNameList - * @param recoveryNodeNameList recoveryNodeNameList - * @param depNodeType depNodeType + * @param startNodeNameList startNodeNameList + * @param recoveryNodeNameList recoveryNodeNameList + * @param depNodeType depNodeType * @return process dag * @throws Exception if error throws Exception */ @@ -246,7 +246,7 @@ public class DagHelper { * find node by node name * * @param nodeDetails nodeDetails - * @param nodeName nodeName + * @param nodeName nodeName * @return task node */ public static TaskNode findNodeByName(List nodeDetails, String nodeName) { @@ -261,8 +261,8 @@ public class DagHelper { /** * the task can be submit when all the depends nodes are forbidden or complete * - * @param taskNode taskNode - * @param dag dag + * @param taskNode taskNode + * @param dag dag * @param completeTaskList completeTaskList * @return can submit */ @@ -276,10 +276,7 @@ public class DagHelper { } for (String dependNodeName : dependList) { TaskNode dependNode = dag.getNode(dependNodeName); - if (dependNode == null) { - continue; - } - if (completeTaskList.containsKey(dependNodeName) + if (dependNode == null || completeTaskList.containsKey(dependNodeName) || dependNode.isForbidden() || skipTaskNodeList.containsKey(dependNodeName)) { continue; @@ -295,6 +292,7 @@ public class DagHelper { * this function parse the condition node to find the right branch. * also check all the depends nodes forbidden or complete * + * @param preNodeName * @return successor nodes */ public static Set parsePostNodes(String preNodeName, @@ -331,6 +329,9 @@ public class DagHelper { /** * if all of the task dependence are skipped, skip it too. + * + * @param taskNode + * @return */ private static boolean isTaskNodeNeedSkip(TaskNode taskNode, Map skipTaskNodeList @@ -350,6 +351,9 @@ public class DagHelper { /** * parse condition task find the branch process * set skip flag for another one. + * + * @param nodeName + * @return */ public static List parseConditionTask(String nodeName, Map skipTaskNodeList, @@ -384,6 +388,11 @@ public class DagHelper { /** * set task node and the post nodes skip flag + * + * @param skipNodeName + * @param dag + * @param completeTaskList + * @param skipTaskNodeList */ private static void setTaskNodeSkip(String skipNodeName, DAG dag, @@ -455,6 +464,9 @@ public class DagHelper { /** * is there have conditions after the parent node + * + * @param parentNodeName + * @return */ public static boolean haveConditionsAfterNode(String parentNodeName, DAG dag @@ -476,6 +488,9 @@ public class DagHelper { /** * is there have conditions after the parent node + * + * @param parentNodeName + * @return */ public static boolean haveConditionsAfterNode(String parentNodeName, List taskNodes