diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 9b3268e1b8..5d22184b8d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -70,10 +70,11 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("endTime") Date endTime ); - List queryTaskByPIdAndStateAndType(@Param("processInstanceId") Integer processInstanceId, - @Param("state") int state, + List queryTaskByPIdAndStatusAndType(@Param("processInstanceId") Integer processInstanceId, + @Param("states") int[] stateArray, @Param("taskType") String taskType); - List queryTasksBySubProcessTaskIdAndState(@Param("subProcessTaskId") Integer subProcessTaskId, - @Param("state") Integer state); + List queryTaskBySubProcessTaskIdAndStatusAndType(@Param("subProcessTaskId") Integer subProcessTaskId, + @Param("states") int[] stateArray, + @Param("taskType") String taskType); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 8b654a2ec8..a25b46eecb 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -129,18 +129,27 @@ order by instance.start_time desc - select id from t_ds_task_instance where process_instance_id = #{processInstanceId} and task_type = #{taskType} - and state = #{state} + and state in + + #{i} + and flag = 1 - select id from t_ds_task_instance where process_instance_id = (select process_instance_id from t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId}) - and state = #{state} + and state in + + #{i} + + + and task_type = #{taskType} + and flag = 1 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7ccc937a02..48f13133cc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -38,7 +38,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.io.File; import java.util.*; import java.util.stream.Collectors; @@ -692,19 +691,23 @@ public class ProcessService { case SCHEDULER: break; case RESUME_FROM_FORCED_SUCCESS: - // TODO: 在这里初始化processInstance的时候可能需要加入startnodelist参数 - // find forced-success tasks here List forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS); // deal with sub_process nodes - List failedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); - List toleranceSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); - List killedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); +// List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); +// List toleranceSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); +// List killedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); +// +// failedSubList.addAll(toleranceSubList); +// failedSubList.addAll(killedSubList); + List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); - failedSubList.addAll(toleranceSubList); - failedSubList.addAll(killedSubList); for (int i = 0; i < failedSubList.size(); i++) { - List tmpResultList = this.findTaskIdBySubProcessTaskIdAndState(failedSubList.get(i), ExecutionStatus.FORCED_SUCCESS); + List tmpResultList = this.findTaskIdInSubProcessByStatusAndType(failedSubList.get(i), + new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, + null); // if there is forced success in the sub_process if (tmpResultList != null && tmpResultList.size() > 0) { forcedSuccessList.add(failedSubList.get(i)); @@ -1281,23 +1284,36 @@ public class ProcessService { /** * get id list by task state and type - * @param instanceId process instance id - * @param state task instance state + * @param processInstanceId process instance id + * @param states task instance state array * @param taskType task type * @return task instance id list */ - public List findTaskIdByInstanceStateAndType(int instanceId, ExecutionStatus state, TaskType taskType){ - return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString()); + public List findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType){ + int[] stateArray = new int[states.length]; + for (int i = 0; i < states.length; i++) { + stateArray[i] = states[i].ordinal(); + } + return taskInstanceMapper.queryTaskByPIdAndStatusAndType(processInstanceId, stateArray, taskType.toString()); } /** - * get tasks in sub_process by sub_process task id and state - * @param taskId task id - * @param state task instance state + * get tasks in sub_process by sub_process task id and state and type + * if param type is null, it queries all types + * @param taskId task instance id + * @param states task instance state array + * @param taskType task type * @return task instance id list */ - public List findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){ - return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal()); + public List findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType){ + int[] stateArray = new int[states.length]; + for (int i = 0; i < states.length; i++) { + stateArray[i] = states[i].ordinal(); + } + if (taskType == null) { + return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, stateArray, null); + } + return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, stateArray, taskType.toString()); } /**