diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index f537fadf75..3ad64d010d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -268,20 +269,32 @@ public class DagHelper { * @return start Vertex list */ public static Collection getStartVertex(String parentNodeName, DAG dag, - Map completeTaskList){ + Map completeTaskList) { - if(completeTaskList == null){ + if (completeTaskList == null) { completeTaskList = new HashMap<>(); } Collection startVertexs = null; - if(StringUtils.isNotEmpty(parentNodeName)){ - startVertexs = dag.getSubsequentNodes(parentNodeName); - }else{ + if (StringUtils.isNotEmpty(parentNodeName)) { + TaskNode task = dag.getNode(parentNodeName); + if (task.isConditionsTask() && completeTaskList.containsKey(parentNodeName)) { + ConditionsParameters conditionsParameters = JSONUtils.parseObject(task.getConditionResult(), ConditionsParameters.class); + TaskInstance taskInstance = completeTaskList.get(parentNodeName); + if (taskInstance.getState().typeIsSuccess()) { + startVertexs = conditionsParameters.getSuccessNode(); + } else if (taskInstance.getState().typeIsFailure()) { + startVertexs = conditionsParameters.getFailedNode(); + } + } + else { + startVertexs = dag.getSubsequentNodes(parentNodeName); + } + } else { startVertexs = dag.getBeginNode(); } List tmpStartVertexs = new ArrayList<>(); - if(startVertexs!= null){ + if (startVertexs != null) { tmpStartVertexs.addAll(startVertexs); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 6d4906748b..dd4de8b635 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -405,7 +405,7 @@ public class MasterExecThread implements Runnable { if(task.isTaskComplete()){ completeTaskList.put(task.getName(), task); } - if(task.getState().typeIsFailure() && !task.taskCanRetry()){ + if (task.getState().typeIsFailure() && !task.taskCanRetry() && !task.isConditionsTask()) { errorTaskList.put(task.getName(), task); } }