Browse Source

revert

pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
079768fdaa
  1. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  2. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  3. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -263,7 +263,7 @@ public class DagHelper {
* @return start Vertex list
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList, List<TaskNode> allNodes){
Map<String, TaskInstance> completeTaskList){
if(completeTaskList == null){
completeTaskList = new HashMap<>();
@ -287,10 +287,10 @@ public class DagHelper {
continue;
}
// then submit the post nodes
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList, allNodes);
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
TaskNode postNode = dag.getNode(post);
if(taskNodeCanSubmit(postNode, dag, completeTaskList, allNodes)){
if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
tmpStartVertexs.add(post);
}
}
@ -308,8 +308,7 @@ public class DagHelper {
*/
public static boolean taskNodeCanSubmit(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
List<TaskNode> allNodes) {
Map<String, TaskInstance> completeTaskList) {
List<String> dependList = taskNode.getDepList();
if(dependList == null){
@ -318,10 +317,6 @@ public class DagHelper {
for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName);
// when executing resume_from_forced_success, depend node may be not in dag
if (dependNode == null) {
dependNode = findNodeByName(allNodes, dependNodeName);
}
if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
return false;
}
@ -330,16 +325,6 @@ public class DagHelper {
}
/**
* generate process data and return all task nodes
* @param processDefinitionJson process definition json
* @return task nodes list
*/
public static List<TaskNode> getAllTaskNodesFromFlowJson(String processDefinitionJson) {
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
return processData.getTasks();
}
/***
* build dag graph
* @param processDag processDag

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -58,21 +58,21 @@ public class DagHelperTest {
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode nodex = dag.getNode("4");
nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null);
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
Assert.assertEquals(canSubmit, true);
// 2forbidden, 3 cannot be submit
completeTaskList.putIfAbsent("2", new TaskInstance());
TaskNode nodey = dag.getNode("4");
nodey.setRunFlag("");
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList, null);
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
Assert.assertEquals(canSubmit, false);
// 2/3 forbidden submit 5
TaskNode node3 = dag.getNode("3");
node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode node5 = dag.getNode("5");
canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList, null);
canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList);
Assert.assertEquals(canSubmit, true);
}

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -130,11 +130,6 @@ public class MasterExecThread implements Runnable {
*/
private DAG<String,TaskNode,TaskNodeRelation> dag;
/**
* all task nodes generated by process instance json
*/
private List<TaskNode> allNodes;
/**
* process service
*/
@ -369,8 +364,6 @@ public class MasterExecThread implements Runnable {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
allNodes = DagHelper.getAllTaskNodesFromFlowJson(processInstance.getProcessInstanceJson());
}
/**
@ -540,7 +533,7 @@ public class MasterExecThread implements Runnable {
private void setTaskNodeSkip(List<String> taskNodesSkipList){
for(String skipNode : taskNodesSkipList){
skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList, allNodes);
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
@ -562,10 +555,6 @@ public class MasterExecThread implements Runnable {
private List<String> parseConditionTask(String nodeName){
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if (taskNode == null) {
taskNode = DagHelper.findNodeByName(allNodes, nodeName);
}
if(!taskNode.isConditionsTask()){
return conditionTaskList;
}
@ -604,7 +593,7 @@ public class MasterExecThread implements Runnable {
if(taskNode != null && taskNode.isConditionsTask()){
return parseConditionTask(previousNodeName);
}
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList, allNodes);
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
// delete success node, parse the past nodes
// if conditions node,
@ -683,18 +672,13 @@ public class MasterExecThread implements Runnable {
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){
// in this condition, it needs to check the command type
if((!dag.containsNode(depsNode) && !processInstance.getCommandType().equals(CommandType.RESUME_FROM_FORCED_SUCCESS))
if(!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){
continue;
}
// all the dependencies must be fully completed
if(!completeTaskList.containsKey(depsNode)){
// if dag doesn't contain it either, the depend will never be executed, so return failed
if (!dag.containsNode(depsNode)) {
return DependResult.FAILED;
}
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
@ -725,10 +709,6 @@ public class MasterExecThread implements Runnable {
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
TaskNode tmpNode = dag.getNode(dependNodeName);
// if dag doesn't contain depend node
if (tmpNode == null) {
tmpNode = DagHelper.findNodeByName(allNodes, dependNodeName);
}
if(tmpNode.isConditionsTask()){
//condition task need check the branch to run

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

@ -121,7 +121,7 @@ public class MasterCommandTest {
new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Collection<String> start = DagHelper.getStartVertex("1", dag, null, null);
Collection<String> start = DagHelper.getStartVertex("1", dag, null);
System.out.println(start.toString());

Loading…
Cancel
Save