From 93653768c813640da7cb3de6cbe15cd8c8aa72a5 Mon Sep 17 00:00:00 2001 From: Zhou Zheng <1606079777@qq.comom> Date: Tue, 4 Aug 2020 20:14:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81sub-process=E5=92=8Ccondition?= =?UTF-8?q?=E7=9A=84TFS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/service/TaskInstanceService.java | 2 +- .../dolphinscheduler/dao/utils/DagHelper.java | 24 +++++-- .../dao/utils/DagHelperTest.java | 6 +- .../master/runner/MasterExecThread.java | 66 ++++++++++++++++--- .../server/master/MasterCommandTest.java | 2 +- .../service/process/ProcessService.java | 6 ++ 6 files changed, 87 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 12fd60977c..2140af497f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -179,7 +179,7 @@ public class TaskInstanceService extends BaseService { putMsg(result, Status.SUCCESS); } else { - // FIXME: 或许应该再加一个状态码 + // FIXME: 或许应该再加一个状态码,如果按照process的执行逻辑,那么是不需要的 putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index d3b829cb4f..1901af318a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.utils; +import com.amazonaws.services.simpleworkflow.model.TaskList; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -262,7 +263,7 @@ public class DagHelper { * @return start Vertex list */ public static Collection getStartVertex(String parentNodeName, DAG dag, - Map completeTaskList){ + Map completeTaskList, List allNodes){ if(completeTaskList == null){ completeTaskList = new HashMap<>(); @@ -286,10 +287,10 @@ public class DagHelper { continue; } // then submit the post nodes - Collection postNodes = getStartVertex(start, dag, completeTaskList); + Collection postNodes = getStartVertex(start, dag, completeTaskList, allNodes); for(String post : postNodes){ TaskNode postNode = dag.getNode(post); - if(taskNodeCanSubmit(postNode, dag, completeTaskList)){ + if(taskNodeCanSubmit(postNode, dag, completeTaskList, allNodes)){ tmpStartVertexs.add(post); } } @@ -307,7 +308,8 @@ public class DagHelper { */ public static boolean taskNodeCanSubmit(TaskNode taskNode, DAG dag, - Map completeTaskList) { + Map completeTaskList, + List allNodes) { List dependList = taskNode.getDepList(); if(dependList == null){ @@ -316,6 +318,10 @@ public class DagHelper { for(String dependNodeName : dependList){ TaskNode dependNode = dag.getNode(dependNodeName); + // when executing resume_from_forced_success, depend node may be not in dag + if (dependNode == null) { + dependNode = findNodeByName(allNodes, dependNodeName); + } if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){ return false; } @@ -324,6 +330,16 @@ public class DagHelper { } + /** + * generate process data and return all task nodes + * @param processDefinitionJson process definition json + * @return task nodes list + */ + public static List getAllTaskNodesFromFlowJson(String processDefinitionJson) { + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + return processData.getTasks(); + } + /*** * build dag graph * @param processDag processDag diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 14dfe0b750..0f1fe9234f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -58,21 +58,21 @@ public class DagHelperTest { node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode nodex = dag.getNode("4"); nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null); Assert.assertEquals(canSubmit, true); // 2forbidden, 3 cannot be submit completeTaskList.putIfAbsent("2", new TaskInstance()); TaskNode nodey = dag.getNode("4"); nodey.setRunFlag(""); - canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null); Assert.assertEquals(canSubmit, false); // 2/3 forbidden submit 5 TaskNode node3 = dag.getNode("3"); node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); TaskNode node5 = dag.getNode("5"); - canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList); + canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList, null); Assert.assertEquals(canSubmit, true); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 4b5c3f7d00..b37f6f66c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -130,6 +130,11 @@ public class MasterExecThread implements Runnable { */ private DAG dag; + /** + * all task nodes generated by process instance json + */ + private List allNodes; + /** * process service */ @@ -364,6 +369,8 @@ public class MasterExecThread implements Runnable { } // generate process dag dag = DagHelper.buildDagGraph(processDag); + + allNodes = DagHelper.getAllTaskNodesFromFlowJson(processInstance.getProcessInstanceJson()); } /** @@ -533,7 +540,7 @@ public class MasterExecThread implements Runnable { private void setTaskNodeSkip(List taskNodesSkipList){ for(String skipNode : taskNodesSkipList){ skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode)); - Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList); + Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList, allNodes); List postSkipList = new ArrayList<>(); for(String post : postNodeList){ TaskNode postNode = dag.getNode(post); @@ -555,6 +562,10 @@ public class MasterExecThread implements Runnable { private List parseConditionTask(String nodeName){ List conditionTaskList = new ArrayList<>(); TaskNode taskNode = dag.getNode(nodeName); + if (taskNode == null) { + taskNode = DagHelper.findNodeByName(allNodes, nodeName); + } + if(!taskNode.isConditionsTask()){ return conditionTaskList; } @@ -593,7 +604,7 @@ public class MasterExecThread implements Runnable { if(taskNode != null && taskNode.isConditionsTask()){ return parseConditionTask(previousNodeName); } - Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList); + Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList, allNodes); List postSkipList = new ArrayList<>(); // delete success node, parse the past nodes // if conditions node, @@ -672,26 +683,33 @@ public class MasterExecThread implements Runnable { TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ - if(!dag.containsNode(depsNode) + // in this condition, it needs to check the command type + if((!dag.containsNode(depsNode) && !processInstance.getCommandType().equals(CommandType.RESUME_FROM_FORCED_SUCCESS)) || forbiddenTaskList.containsKey(depsNode) || skipTaskNodeList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + // all the dependencies must be fully completed if(!completeTaskList.containsKey(depsNode)){ + // if dag doesn't contain it either, the depend will never be executed, so return failed + if (!dag.containsNode(depsNode)) { + return DependResult.FAILED; + } return DependResult.WAITING; } ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(!dependTaskSuccess(depsNode, taskName)){ + return DependResult.FAILED; + } } logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); @@ -699,6 +717,34 @@ public class MasterExecThread implements Runnable { return DependResult.SUCCESS; } + /** + * depend node is completed, but here need check the condition task branch is the next node + * @param dependNodeName + * @param nextNodeName + * @return + */ + private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ + TaskNode tmpNode = dag.getNode(dependNodeName); + // if dag doesn't contain depend node + if (tmpNode == null) { + tmpNode = DagHelper.findNodeByName(allNodes, dependNodeName); + } + + if(tmpNode.isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(dependNodeName); + if(!nextTaskList.contains(nextNodeName)){ + return false; + } + }else { + ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + if(depTaskState.typeIsFailure()){ + return false; + } + } + return true; + } + /** * query task instance by complete state diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java index d541f43a3b..c36521b855 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java @@ -121,7 +121,7 @@ public class MasterCommandTest { new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST); DAG dag = DagHelper.buildDagGraph(processDag); - Collection start = DagHelper.getStartVertex("1", dag, null); + Collection start = DagHelper.getStartVertex("1", dag, null, null); System.out.println(start.toString()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8a379ee446..7ccc937a02 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1290,6 +1290,12 @@ public class ProcessService { return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString()); } + /** + * get tasks in sub_process by sub_process task id and state + * @param taskId task id + * @param state task instance state + * @return task instance id list + */ public List findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){ return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal()); }