Browse Source

支持sub-process和condition的TFS

pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
93653768c8
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  2. 24
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  3. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  4. 66
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
  6. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -179,7 +179,7 @@ public class TaskInstanceService extends BaseService {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }
else { else {
// FIXME: 或许应该再加一个状态码 // FIXME: 或许应该再加一个状态码,如果按照process的执行逻辑,那么是不需要的
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
} }

24
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.utils; package org.apache.dolphinscheduler.dao.utils;
import com.amazonaws.services.simpleworkflow.model.TaskList;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -262,7 +263,7 @@ public class DagHelper {
* @return start Vertex list * @return start Vertex list
*/ */
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag, public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){ Map<String, TaskInstance> completeTaskList, List<TaskNode> allNodes){
if(completeTaskList == null){ if(completeTaskList == null){
completeTaskList = new HashMap<>(); completeTaskList = new HashMap<>();
@ -286,10 +287,10 @@ public class DagHelper {
continue; continue;
} }
// then submit the post nodes // then submit the post nodes
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList); Collection<String> postNodes = getStartVertex(start, dag, completeTaskList, allNodes);
for(String post : postNodes){ for(String post : postNodes){
TaskNode postNode = dag.getNode(post); TaskNode postNode = dag.getNode(post);
if(taskNodeCanSubmit(postNode, dag, completeTaskList)){ if(taskNodeCanSubmit(postNode, dag, completeTaskList, allNodes)){
tmpStartVertexs.add(post); tmpStartVertexs.add(post);
} }
} }
@ -307,7 +308,8 @@ public class DagHelper {
*/ */
public static boolean taskNodeCanSubmit(TaskNode taskNode, public static boolean taskNodeCanSubmit(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) { Map<String, TaskInstance> completeTaskList,
List<TaskNode> allNodes) {
List<String> dependList = taskNode.getDepList(); List<String> dependList = taskNode.getDepList();
if(dependList == null){ if(dependList == null){
@ -316,6 +318,10 @@ public class DagHelper {
for(String dependNodeName : dependList){ for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName); TaskNode dependNode = dag.getNode(dependNodeName);
// when executing resume_from_forced_success, depend node may be not in dag
if (dependNode == null) {
dependNode = findNodeByName(allNodes, dependNodeName);
}
if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){ if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
return false; return false;
} }
@ -324,6 +330,16 @@ public class DagHelper {
} }
/**
* generate process data and return all task nodes
* @param processDefinitionJson process definition json
* @return task nodes list
*/
public static List<TaskNode> getAllTaskNodesFromFlowJson(String processDefinitionJson) {
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
return processData.getTasks();
}
/*** /***
* build dag graph * build dag graph
* @param processDag processDag * @param processDag processDag

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -58,21 +58,21 @@ public class DagHelperTest {
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode nodex = dag.getNode("4"); TaskNode nodex = dag.getNode("4");
nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null);
Assert.assertEquals(canSubmit, true); Assert.assertEquals(canSubmit, true);
// 2forbidden, 3 cannot be submit // 2forbidden, 3 cannot be submit
completeTaskList.putIfAbsent("2", new TaskInstance()); completeTaskList.putIfAbsent("2", new TaskInstance());
TaskNode nodey = dag.getNode("4"); TaskNode nodey = dag.getNode("4");
nodey.setRunFlag(""); nodey.setRunFlag("");
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null);
Assert.assertEquals(canSubmit, false); Assert.assertEquals(canSubmit, false);
// 2/3 forbidden submit 5 // 2/3 forbidden submit 5
TaskNode node3 = dag.getNode("3"); TaskNode node3 = dag.getNode("3");
node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode node5 = dag.getNode("5"); TaskNode node5 = dag.getNode("5");
canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList); canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList, null);
Assert.assertEquals(canSubmit, true); Assert.assertEquals(canSubmit, true);
} }

66
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -130,6 +130,11 @@ public class MasterExecThread implements Runnable {
*/ */
private DAG<String,TaskNode,TaskNodeRelation> dag; private DAG<String,TaskNode,TaskNodeRelation> dag;
/**
* all task nodes generated by process instance json
*/
private List<TaskNode> allNodes;
/** /**
* process service * process service
*/ */
@ -364,6 +369,8 @@ public class MasterExecThread implements Runnable {
} }
// generate process dag // generate process dag
dag = DagHelper.buildDagGraph(processDag); dag = DagHelper.buildDagGraph(processDag);
allNodes = DagHelper.getAllTaskNodesFromFlowJson(processInstance.getProcessInstanceJson());
} }
/** /**
@ -533,7 +540,7 @@ public class MasterExecThread implements Runnable {
private void setTaskNodeSkip(List<String> taskNodesSkipList){ private void setTaskNodeSkip(List<String> taskNodesSkipList){
for(String skipNode : taskNodesSkipList){ for(String skipNode : taskNodesSkipList){
skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode)); skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList); Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList, allNodes);
List<String> postSkipList = new ArrayList<>(); List<String> postSkipList = new ArrayList<>();
for(String post : postNodeList){ for(String post : postNodeList){
TaskNode postNode = dag.getNode(post); TaskNode postNode = dag.getNode(post);
@ -555,6 +562,10 @@ public class MasterExecThread implements Runnable {
private List<String> parseConditionTask(String nodeName){ private List<String> parseConditionTask(String nodeName){
List<String> conditionTaskList = new ArrayList<>(); List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName); TaskNode taskNode = dag.getNode(nodeName);
if (taskNode == null) {
taskNode = DagHelper.findNodeByName(allNodes, nodeName);
}
if(!taskNode.isConditionsTask()){ if(!taskNode.isConditionsTask()){
return conditionTaskList; return conditionTaskList;
} }
@ -593,7 +604,7 @@ public class MasterExecThread implements Runnable {
if(taskNode != null && taskNode.isConditionsTask()){ if(taskNode != null && taskNode.isConditionsTask()){
return parseConditionTask(previousNodeName); return parseConditionTask(previousNodeName);
} }
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList); Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList, allNodes);
List<String> postSkipList = new ArrayList<>(); List<String> postSkipList = new ArrayList<>();
// delete success node, parse the past nodes // delete success node, parse the past nodes
// if conditions node, // if conditions node,
@ -672,26 +683,33 @@ public class MasterExecThread implements Runnable {
TaskNode taskNode = dag.getNode(taskName); TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList(); List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){ for(String depsNode : depNameList ){
if(!dag.containsNode(depsNode) // in this condition, it needs to check the command type
if((!dag.containsNode(depsNode) && !processInstance.getCommandType().equals(CommandType.RESUME_FROM_FORCED_SUCCESS))
|| forbiddenTaskList.containsKey(depsNode) || forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){ || skipTaskNodeList.containsKey(depsNode)){
continue; continue;
} }
// dependencies must be fully completed // all the dependencies must be fully completed
if(!completeTaskList.containsKey(depsNode)){ if(!completeTaskList.containsKey(depsNode)){
// if dag doesn't contain it either, the depend will never be executed, so return failed
if (!dag.containsNode(depsNode)) {
return DependResult.FAILED;
}
return DependResult.WAITING; return DependResult.WAITING;
} }
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING; return DependResult.WAITING;
} }
// ignore task state if current task is condition
if(taskNode.isConditionsTask()){
continue;
}
if(!dependTaskSuccess(depsNode, taskName)){
return DependResult.FAILED;
}
} }
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
@ -699,6 +717,34 @@ public class MasterExecThread implements Runnable {
return DependResult.SUCCESS; return DependResult.SUCCESS;
} }
/**
* depend node is completed, but here need check the condition task branch is the next node
* @param dependNodeName
* @param nextNodeName
* @return
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
TaskNode tmpNode = dag.getNode(dependNodeName);
// if dag doesn't contain depend node
if (tmpNode == null) {
tmpNode = DagHelper.findNodeByName(allNodes, dependNodeName);
}
if(tmpNode.isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
}else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){
return false;
}
}
return true;
}
/** /**
* query task instance by complete state * query task instance by complete state

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

@ -121,7 +121,7 @@ public class MasterCommandTest {
new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST); new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag); DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Collection<String> start = DagHelper.getStartVertex("1", dag, null); Collection<String> start = DagHelper.getStartVertex("1", dag, null, null);
System.out.println(start.toString()); System.out.println(start.toString());

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1290,6 +1290,12 @@ public class ProcessService {
return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString()); return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString());
} }
/**
* get tasks in sub_process by sub_process task id and state
* @param taskId task id
* @param state task instance state
* @return task instance id list
*/
public List<Integer> findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){ public List<Integer> findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){
return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal()); return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal());
} }

Loading…
Cancel
Save