Browse Source

Merge pull request #3375 from lenboo/132

[BUG-3324][SERVER]: there exists some problems in checking task dependency
pull/3/MERGE
xingchun-chen 4 years ago committed by GitHub
parent
commit
bb9f81b725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -663,37 +663,58 @@ public class MasterExecThread implements Runnable {
if(startNodes.contains(taskName)){ if(startNodes.contains(taskName)){
return DependResult.SUCCESS; return DependResult.SUCCESS;
} }
TaskNode taskNode = dag.getNode(taskName); TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList(); List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){ for(String depsNode : depNameList ){
if(!dag.containsNode(depsNode) if(!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode) || forbiddenTaskList.containsKey(depsNode)){
|| skipTaskNodeList.containsKey(depsNode)){
continue; continue;
} }
// dependencies must be fully completed if(skipTaskNodeList.containsKey(depsNode)){
return DependResult.FAILED;
}
// all the dependencies must be completed
if(!completeTaskList.containsKey(depsNode)){ if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING; return DependResult.WAITING;
} }
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING; return DependResult.WAITING;
} }
// ignore task state if current task is condition
if(taskNode.isConditionsTask()){
continue;
}
if(!dependTaskSuccess(depsNode, taskName)){
return DependResult.FAILED;
}
} }
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS; return DependResult.SUCCESS;
} }
/**
* depend node is completed, but here need check the condition task branch is the next node
* @param dependNodeName
* @param nextNodeName
* @return
*/
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
}else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){
return false;
}
}
return true;
}
/** /**
* query task instance by complete state * query task instance by complete state

Loading…
Cancel
Save