From 18ae56bb6a3516db69ecb880ca6ec1e9c88305e8 Mon Sep 17 00:00:00 2001 From: Zhou Zheng <1606079777@qq.comom> Date: Wed, 5 Aug 2020 21:42:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=8F=98=E5=8E=9F=E6=9C=89=E7=9A=84da?= =?UTF-8?q?g=E6=9E=84=E5=BB=BA=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 这样的话可以完整的构建整个dag,恢复之前的上下文;也没有任务可能重复执行的担忧 2. 不需要额外去处理process执行完之后的状态(主要是部分与整体的原因) 3. RecoverNodeIdList也不会重复 --- .../service/process/ProcessService.java | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 48f13133cc..dc2e468fcd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -691,40 +691,29 @@ public class ProcessService { case SCHEDULER: break; case RESUME_FROM_FORCED_SUCCESS: - // find forced-success tasks here - List forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS); - // deal with sub_process nodes -// List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); -// List toleranceSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); -// List killedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); -// -// failedSubList.addAll(toleranceSubList); -// failedSubList.addAll(killedSubList); List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, TaskType.SUB_PROCESS); for (int i = 0; i < failedSubList.size(); i++) { - List tmpResultList = this.findTaskIdInSubProcessByStatusAndType(failedSubList.get(i), - new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, - null); - // if there is forced success in the sub_process - if (tmpResultList != null && tmpResultList.size() > 0) { - forcedSuccessList.add(failedSubList.get(i)); + // if there exists forced success in the sub_process + if (haveForcedSuccessInSubProcess(failedSubList.get(i))) { // change sub_process task's state into submitted_success TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i)); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance); } } - - // set resume node list + /** + * set resume node list to null + * 1. we can have a complete dag in the ExecThread so that it can restore the previous context + * 2. each time the operation is done the state of process will be reasonable as usual + */ cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING); cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(forcedSuccessList))); + String.join(Constants.COMMA, convertIntListToString(null))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); - break; default: break; @@ -733,6 +722,30 @@ public class ProcessService { return processInstance; } + /** + * recursively check if a sub process node contains forced success node + * @param taskInstanceId task instance id + * @return true or false + */ + public boolean haveForcedSuccessInSubProcess(int taskInstanceId) { + List forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, + null); + if (forcedSuccessList != null && forcedSuccessList.size() > 0) { + return true; + } + + List childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); + for (Integer child : childSubList) { + if (haveForcedSuccessInSubProcess(child)) { + return true; + } + } + return false; + } + /** * return complement data if the process start with complement data * @param processInstance processInstance