diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 48f13133cc..dc2e468fcd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -691,40 +691,29 @@ public class ProcessService { case SCHEDULER: break; case RESUME_FROM_FORCED_SUCCESS: - // find forced-success tasks here - List forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS); - // deal with sub_process nodes -// List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); -// List toleranceSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); -// List killedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); -// -// failedSubList.addAll(toleranceSubList); -// failedSubList.addAll(killedSubList); List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, TaskType.SUB_PROCESS); for (int i = 0; i < failedSubList.size(); i++) { - List tmpResultList = this.findTaskIdInSubProcessByStatusAndType(failedSubList.get(i), - new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, - null); - // if there is forced success in the sub_process - if (tmpResultList != null && tmpResultList.size() > 0) { - forcedSuccessList.add(failedSubList.get(i)); + // if there exists forced success in the sub_process + if (haveForcedSuccessInSubProcess(failedSubList.get(i))) { // change sub_process task's state into submitted_success TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i)); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance); } } - - // set resume node list + /** + * set resume node list to null + * 1. we can have a complete dag in the ExecThread so that it can restore the previous context + * 2. each time the operation is done the state of process will be reasonable as usual + */ cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING); cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(forcedSuccessList))); + String.join(Constants.COMMA, convertIntListToString(null))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); - break; default: break; @@ -733,6 +722,30 @@ public class ProcessService { return processInstance; } + /** + * recursively check if a sub process node contains forced success node + * @param taskInstanceId task instance id + * @return true or false + */ + public boolean haveForcedSuccessInSubProcess(int taskInstanceId) { + List forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, + null); + if (forcedSuccessList != null && forcedSuccessList.size() > 0) { + return true; + } + + List childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); + for (Integer child : childSubList) { + if (haveForcedSuccessInSubProcess(child)) { + return true; + } + } + return false; + } + /** * return complement data if the process start with complement data * @param processInstance processInstance