Browse Source

[Bug-7319][MasterServer] fix taskNode NPE when switch else branch is empty (#7320)

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
99b8ec6492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  2. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -50,7 +50,6 @@ public class DagHelper {
private static final Logger logger = LoggerFactory.getLogger(DagHelper.class); private static final Logger logger = LoggerFactory.getLogger(DagHelper.class);
/** /**
* generate flow node relation list by task node list; * generate flow node relation list by task node list;
* Edges that are not in the task Node List will not be added to the result * Edges that are not in the task Node List will not be added to the result
@ -135,7 +134,6 @@ public class DagHelper {
return destTaskNodeList; return destTaskNodeList;
} }
/** /**
* find all the nodes that depended on the start node * find all the nodes that depended on the start node
* *
@ -160,7 +158,6 @@ public class DagHelper {
return resultList; return resultList;
} }
/** /**
* find all nodes that start nodes depend on. * find all nodes that start nodes depend on.
* *
@ -310,6 +307,10 @@ public class DagHelper {
} }
for (String subsequent : startVertexes) { for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent); TaskNode taskNode = dag.getNode(subsequent);
if (taskNode == null) {
logger.error("taskNode {} is null, please check dag", subsequent);
continue;
}
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) { if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList); setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList);
continue; continue;
@ -343,7 +344,6 @@ public class DagHelper {
return true; return true;
} }
/** /**
* parse condition task find the branch process * parse condition task find the branch process
* set skip flag for another one. * set skip flag for another one.
@ -443,7 +443,6 @@ public class DagHelper {
} }
} }
/*** /***
* build dag graph * build dag graph
* @param processDag processDag * @param processDag processDag

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import java.util.Date; import java.util.Date;
@ -75,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition( taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
); );
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),processInstance.getProcessDefinitionCode(), taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(), taskInstance.getProcessInstanceId(),
taskInstance.getId())); taskInstance.getId()));
@ -176,7 +177,13 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
switchParameters.setResultConditionLocation(finalConditionLocation); switchParameters.setResultConditionLocation(finalConditionLocation);
taskInstance.setSwitchDependency(switchParameters); taskInstance.setSwitchDependency(switchParameters);
logger.info("the switch task depend result : {}", conditionResult); if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) {
conditionResult = DependResult.FAILED;
logger.error("the switch task depend result is invalid, result:{}, switch branch:{}", conditionResult, finalConditionLocation);
return true;
}
logger.info("the switch task depend result:{}, switch branch:{}", conditionResult, finalConditionLocation);
return true; return true;
} }
@ -221,4 +228,18 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return content; return content;
} }
/**
* check whether switch result is valid
*/
private boolean isValidSwitchResult(SwitchResultVo switchResult) {
if (CollectionUtils.isEmpty(switchResult.getNextNode())) {
return false;
}
for (String nextNode : switchResult.getNextNode()) {
if (StringUtils.isEmpty(nextNode)) {
return false;
}
}
return true;
}
} }

Loading…
Cancel
Save