diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index d419505e83..e7404fe141 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -50,7 +50,6 @@ public class DagHelper { private static final Logger logger = LoggerFactory.getLogger(DagHelper.class); - /** * generate flow node relation list by task node list; * Edges that are not in the task Node List will not be added to the result @@ -135,7 +134,6 @@ public class DagHelper { return destTaskNodeList; } - /** * find all the nodes that depended on the start node * @@ -160,7 +158,6 @@ public class DagHelper { return resultList; } - /** * find all nodes that start nodes depend on. * @@ -310,6 +307,10 @@ public class DagHelper { } for (String subsequent : startVertexes) { TaskNode taskNode = dag.getNode(subsequent); + if (taskNode == null) { + logger.error("taskNode {} is null, please check dag", subsequent); + continue; + } if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) { setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList); continue; @@ -343,7 +344,6 @@ public class DagHelper { return true; } - /** * parse condition task find the branch process * set skip flag for another one. @@ -443,7 +443,6 @@ public class DagHelper { } } - /*** * build dag graph * @param processDag processDag diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 53786496a1..0fa6e5dbd2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; import java.util.Date; @@ -75,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { taskDefinition = processService.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); - taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(), + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); @@ -176,7 +177,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { switchParameters.setResultConditionLocation(finalConditionLocation); taskInstance.setSwitchDependency(switchParameters); - logger.info("the switch task depend result : {}", conditionResult); + if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) { + conditionResult = DependResult.FAILED; + logger.error("the switch task depend result is invalid, result:{}, switch branch:{}", conditionResult, finalConditionLocation); + return true; + } + + logger.info("the switch task depend result:{}, switch branch:{}", conditionResult, finalConditionLocation); return true; } @@ -221,4 +228,18 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { return content; } + /** + * check whether switch result is valid + */ + private boolean isValidSwitchResult(SwitchResultVo switchResult) { + if (CollectionUtils.isEmpty(switchResult.getNextNode())) { + return false; + } + for (String nextNode : switchResult.getNextNode()) { + if (StringUtils.isEmpty(nextNode)) { + return false; + } + } + return true; + } }