From 5e9d664b8081407c9dd0dae68a21bd7238f02713 Mon Sep 17 00:00:00 2001 From: Zhou Zheng <1606079777@qq.comom> Date: Sun, 26 Jul 2020 00:19:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=8E=E5=BC=BA=E5=88=B6=E6=88=90=E5=8A=9F?= =?UTF-8?q?=E5=A4=84=E5=90=AF=E5=8A=A8-masterServer=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 添加两个查询(mapper.java和mapper.xml) 2. 添加ProcessService中constructProcessInstance中对应的部分 --- .../dao/mapper/TaskInstanceMapper.java | 7 +++ .../dao/mapper/TaskInstanceMapper.xml | 14 ++++++ .../service/process/ProcessService.java | 47 +++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index ac23b25c9c..9b3268e1b8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -69,4 +69,11 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("startTime") Date startTime, @Param("endTime") Date endTime ); + + List queryTaskByPIdAndStateAndType(@Param("processInstanceId") Integer processInstanceId, + @Param("state") int state, + @Param("taskType") String taskType); + + List queryTasksBySubProcessTaskIdAndState(@Param("subProcessTaskId") Integer subProcessTaskId, + @Param("state") Integer state); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 143761bf8c..ebf446a0d8 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -129,4 +129,18 @@ order by instance.start_time desc + + diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 21de9f04d7..8a379ee446 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -690,6 +690,38 @@ public class ProcessService { initComplementDataParam(processDefinition, processInstance, cmdParam); break; case SCHEDULER: + break; + case RESUME_FROM_FORCED_SUCCESS: + // TODO: 在这里初始化processInstance的时候可能需要加入startnodelist参数 + + // find forced-success tasks here + List forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS); + // deal with sub_process nodes + List failedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); + List toleranceSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); + List killedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); + + failedSubList.addAll(toleranceSubList); + failedSubList.addAll(killedSubList); + for (int i = 0; i < failedSubList.size(); i++) { + List tmpResultList = this.findTaskIdBySubProcessTaskIdAndState(failedSubList.get(i), ExecutionStatus.FORCED_SUCCESS); + // if there is forced success in the sub_process + if (tmpResultList != null && tmpResultList.size() > 0) { + forcedSuccessList.add(failedSubList.get(i)); + // change sub_process task's state into submitted_success + TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i)); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + updateTaskInstance(taskInstance); + } + } + + // set resume node list + cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING); + cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, + String.join(Constants.COMMA, convertIntListToString(forcedSuccessList))); + processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processInstance.setRunTimes(runTime + 1); + break; default: break; @@ -1247,6 +1279,21 @@ public class ProcessService { return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal()); } + /** + * get id list by task state and type + * @param instanceId process instance id + * @param state task instance state + * @param taskType task type + * @return task instance id list + */ + public List findTaskIdByInstanceStateAndType(int instanceId, ExecutionStatus state, TaskType taskType){ + return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString()); + } + + public List findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){ + return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal()); + } + /** * find valid task list by process definition id * @param processInstanceId processInstanceId