Browse Source

fix bug about CONDITION-task

1. condition shoudn't be in error list
2. condition need more checks when rusume_from_forced_success
pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
d2a43b0c25
  1. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -268,20 +269,32 @@ public class DagHelper {
* @return start Vertex list
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
Map<String, TaskInstance> completeTaskList) {
if(completeTaskList == null){
if (completeTaskList == null) {
completeTaskList = new HashMap<>();
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
}else{
if (StringUtils.isNotEmpty(parentNodeName)) {
TaskNode task = dag.getNode(parentNodeName);
if (task.isConditionsTask() && completeTaskList.containsKey(parentNodeName)) {
ConditionsParameters conditionsParameters = JSONUtils.parseObject(task.getConditionResult(), ConditionsParameters.class);
TaskInstance taskInstance = completeTaskList.get(parentNodeName);
if (taskInstance.getState().typeIsSuccess()) {
startVertexs = conditionsParameters.getSuccessNode();
} else if (taskInstance.getState().typeIsFailure()) {
startVertexs = conditionsParameters.getFailedNode();
}
}
else {
startVertexs = dag.getSubsequentNodes(parentNodeName);
}
} else {
startVertexs = dag.getBeginNode();
}
List<String> tmpStartVertexs = new ArrayList<>();
if(startVertexs!= null){
if (startVertexs != null) {
tmpStartVertexs.addAll(startVertexs);
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -405,7 +405,7 @@ public class MasterExecThread implements Runnable {
if(task.isTaskComplete()){
completeTaskList.put(task.getName(), task);
}
if(task.getState().typeIsFailure() && !task.taskCanRetry()){
if (task.getState().typeIsFailure() && !task.taskCanRetry() && !task.isConditionsTask()) {
errorTaskList.put(task.getName(), task);
}
}

Loading…
Cancel
Save