|
|
@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; |
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.google.common.collect.Lists; |
|
|
|
import com.google.common.collect.Lists; |
|
|
|
|
|
|
|
import net.sf.jsqlparser.expression.NextValExpression; |
|
|
|
import org.apache.commons.io.FileUtils; |
|
|
|
import org.apache.commons.io.FileUtils; |
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
import org.apache.dolphinscheduler.common.enums.*; |
|
|
|
import org.apache.dolphinscheduler.common.enums.*; |
|
|
@ -663,7 +664,6 @@ public class MasterExecThread implements Runnable { |
|
|
|
if(startNodes.contains(taskName)){ |
|
|
|
if(startNodes.contains(taskName)){ |
|
|
|
return DependResult.SUCCESS; |
|
|
|
return DependResult.SUCCESS; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
TaskNode taskNode = dag.getNode(taskName); |
|
|
|
TaskNode taskNode = dag.getNode(taskName); |
|
|
|
List<String> depNameList = taskNode.getDepList(); |
|
|
|
List<String> depNameList = taskNode.getDepList(); |
|
|
|
for(String depsNode : depNameList ){ |
|
|
|
for(String depsNode : depNameList ){ |
|
|
@ -672,25 +672,30 @@ public class MasterExecThread implements Runnable { |
|
|
|
|| skipTaskNodeList.containsKey(depsNode)){ |
|
|
|
|| skipTaskNodeList.containsKey(depsNode)){ |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
// dependencies must be fully completed
|
|
|
|
// all the dependencies must be completed
|
|
|
|
if(!completeTaskList.containsKey(depsNode)){ |
|
|
|
if(!completeTaskList.containsKey(depsNode)){ |
|
|
|
return DependResult.WAITING; |
|
|
|
return DependResult.WAITING; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// depend node has already complete.
|
|
|
|
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); |
|
|
|
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); |
|
|
|
// conditions task would not return failed.
|
|
|
|
|
|
|
|
if(depTaskState.typeIsFailure() |
|
|
|
|
|
|
|
&& !DagHelper.haveConditionsAfterNode(depsNode, dag ) |
|
|
|
|
|
|
|
&& !dag.getNode(depsNode).isConditionsTask()){ |
|
|
|
|
|
|
|
return DependResult.FAILED; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ |
|
|
|
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ |
|
|
|
return DependResult.WAITING; |
|
|
|
return DependResult.WAITING; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// ignore task state if current task is condition
|
|
|
|
|
|
|
|
if(taskNode.isConditionsTask()){ |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if(dag.getNode(depsNode).isConditionsTask()){ |
|
|
|
|
|
|
|
//condition task need check the branch to run
|
|
|
|
|
|
|
|
List<String> nextTaskList = parseConditionTask(depsNode); |
|
|
|
|
|
|
|
if(!nextTaskList.contains(taskName)){ |
|
|
|
|
|
|
|
return DependResult.FAILED; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}else if(depTaskState.typeIsFailure()){ |
|
|
|
|
|
|
|
return DependResult.FAILED; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); |
|
|
|
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); |
|
|
|
|
|
|
|
|
|
|
|
return DependResult.SUCCESS; |
|
|
|
return DependResult.SUCCESS; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|