From 079768fdaaadd8b601c80afcbb2606afb15aba3b Mon Sep 17 00:00:00 2001 From: Zhou Zheng <1606079777@qq.comom> Date: Fri, 7 Aug 2020 18:05:04 +0800 Subject: [PATCH] revert --- .../dolphinscheduler/dao/utils/DagHelper.java | 23 +++------------- .../dao/utils/DagHelperTest.java | 6 ++--- .../master/runner/MasterExecThread.java | 26 +++---------------- .../server/master/MasterCommandTest.java | 2 +- 4 files changed, 11 insertions(+), 46 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 1901af318a..a16c14ebc1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -263,7 +263,7 @@ public class DagHelper { * @return start Vertex list */ public static Collection getStartVertex(String parentNodeName, DAG dag, - Map completeTaskList, List allNodes){ + Map completeTaskList){ if(completeTaskList == null){ completeTaskList = new HashMap<>(); @@ -287,10 +287,10 @@ public class DagHelper { continue; } // then submit the post nodes - Collection postNodes = getStartVertex(start, dag, completeTaskList, allNodes); + Collection postNodes = getStartVertex(start, dag, completeTaskList); for(String post : postNodes){ TaskNode postNode = dag.getNode(post); - if(taskNodeCanSubmit(postNode, dag, completeTaskList, allNodes)){ + if(taskNodeCanSubmit(postNode, dag, completeTaskList)){ tmpStartVertexs.add(post); } } @@ -308,8 +308,7 @@ public class DagHelper { */ public static boolean taskNodeCanSubmit(TaskNode taskNode, DAG dag, - Map completeTaskList, - List allNodes) { + Map completeTaskList) { List dependList = taskNode.getDepList(); if(dependList == null){ @@ -318,10 +317,6 @@ public class DagHelper { for(String dependNodeName : dependList){ TaskNode dependNode = dag.getNode(dependNodeName); - // when executing resume_from_forced_success, depend node may be not in dag - if (dependNode == null) { - dependNode = findNodeByName(allNodes, dependNodeName); - } if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){ return false; } @@ -330,16 +325,6 @@ public class DagHelper { } - /** - * generate process data and return all task nodes - * @param processDefinitionJson process definition json - * @return task nodes list - */ - public static List getAllTaskNodesFromFlowJson(String processDefinitionJson) { - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - return processData.getTasks(); - } - /*** * build dag graph * @param processDag processDag diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 0f1fe9234f..14dfe0b750 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -58,21 +58,21 @@ public class DagHelperTest { node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode nodex = dag.getNode("4"); nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); Assert.assertEquals(canSubmit, true); // 2forbidden, 3 cannot be submit completeTaskList.putIfAbsent("2", new TaskInstance()); TaskNode nodey = dag.getNode("4"); nodey.setRunFlag(""); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); Assert.assertEquals(canSubmit, false); // 2/3 forbidden submit 5 TaskNode node3 = dag.getNode("3"); node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode node5 = dag.getNode("5"); - canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList, null); + canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList); Assert.assertEquals(canSubmit, true); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 4b039f1a93..a25ae10ad3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -130,11 +130,6 @@ public class MasterExecThread implements Runnable { */ private DAG dag; - /** - * all task nodes generated by process instance json - */ - private List allNodes; - /** * process service */ @@ -369,8 +364,6 @@ public class MasterExecThread implements Runnable { } // generate process dag dag = DagHelper.buildDagGraph(processDag); - - allNodes = DagHelper.getAllTaskNodesFromFlowJson(processInstance.getProcessInstanceJson()); } /** @@ -540,7 +533,7 @@ public class MasterExecThread implements Runnable { private void setTaskNodeSkip(List taskNodesSkipList){ for(String skipNode : taskNodesSkipList){ skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode)); - Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList, allNodes); + Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList); List postSkipList = new ArrayList<>(); for(String post : postNodeList){ TaskNode postNode = dag.getNode(post); @@ -562,10 +555,6 @@ public class MasterExecThread implements Runnable { private List parseConditionTask(String nodeName){ List conditionTaskList = new ArrayList<>(); TaskNode taskNode = dag.getNode(nodeName); - if (taskNode == null) { - taskNode = DagHelper.findNodeByName(allNodes, nodeName); - } - if(!taskNode.isConditionsTask()){ return conditionTaskList; } @@ -604,7 +593,7 @@ public class MasterExecThread implements Runnable { if(taskNode != null && taskNode.isConditionsTask()){ return parseConditionTask(previousNodeName); } - Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList, allNodes); + Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList); List postSkipList = new ArrayList<>(); // delete success node, parse the past nodes // if conditions node, @@ -683,18 +672,13 @@ public class MasterExecThread implements Runnable { TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ - // in this condition, it needs to check the command type - if((!dag.containsNode(depsNode) && !processInstance.getCommandType().equals(CommandType.RESUME_FROM_FORCED_SUCCESS)) + if(!dag.containsNode(depsNode) || forbiddenTaskList.containsKey(depsNode) || skipTaskNodeList.containsKey(depsNode)){ continue; } // all the dependencies must be fully completed if(!completeTaskList.containsKey(depsNode)){ - // if dag doesn't contain it either, the depend will never be executed, so return failed - if (!dag.containsNode(depsNode)) { - return DependResult.FAILED; - } return DependResult.WAITING; } ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); @@ -725,10 +709,6 @@ public class MasterExecThread implements Runnable { */ private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ TaskNode tmpNode = dag.getNode(dependNodeName); - // if dag doesn't contain depend node - if (tmpNode == null) { - tmpNode = DagHelper.findNodeByName(allNodes, dependNodeName); - } if(tmpNode.isConditionsTask()){ //condition task need check the branch to run diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java index c36521b855..d541f43a3b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java @@ -121,7 +121,7 @@ public class MasterCommandTest { new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST); DAG dag = DagHelper.buildDagGraph(processDag); - Collection start = DagHelper.getStartVertex("1", dag, null, null); + Collection start = DagHelper.getStartVertex("1", dag, null); System.out.println(start.toString());