Browse Source

调整数据库访问函数

pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
5d02e30772
  1. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  2. 17
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  3. 52
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -70,10 +70,11 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("endTime") Date endTime @Param("endTime") Date endTime
); );
List<Integer> queryTaskByPIdAndStateAndType(@Param("processInstanceId") Integer processInstanceId, List<Integer> queryTaskByPIdAndStatusAndType(@Param("processInstanceId") Integer processInstanceId,
@Param("state") int state, @Param("states") int[] stateArray,
@Param("taskType") String taskType); @Param("taskType") String taskType);
List<Integer> queryTasksBySubProcessTaskIdAndState(@Param("subProcessTaskId") Integer subProcessTaskId, List<Integer> queryTaskBySubProcessTaskIdAndStatusAndType(@Param("subProcessTaskId") Integer subProcessTaskId,
@Param("state") Integer state); @Param("states") int[] stateArray,
@Param("taskType") String taskType);
} }

17
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -129,18 +129,27 @@
</if> </if>
order by instance.start_time desc order by instance.start_time desc
</select> </select>
<select id="queryTaskByPIdAndStateAndType" resultType="java.lang.Integer"> <select id="queryTaskByPIdAndStatusAndType" resultType="java.lang.Integer">
select id from t_ds_task_instance select id from t_ds_task_instance
where process_instance_id = #{processInstanceId} where process_instance_id = #{processInstanceId}
and task_type = #{taskType} and task_type = #{taskType}
and state = #{state} and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
and flag = 1 and flag = 1
</select> </select>
<select id="queryTasksBySubProcessTaskIdAndState" resultType="java.lang.Integer"> <select id="queryTaskBySubProcessTaskIdAndStatusAndType" resultType="java.lang.Integer">
select id from t_ds_task_instance select id from t_ds_task_instance
where process_instance_id = where process_instance_id =
(select process_instance_id from t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId}) (select process_instance_id from t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId})
and state = #{state} and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
<if test="taskType != null">
and task_type = #{taskType}
</if>
and flag = 1 and flag = 1
</select> </select>
</mapper> </mapper>

52
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -38,7 +38,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -692,19 +691,23 @@ public class ProcessService {
case SCHEDULER: case SCHEDULER:
break; break;
case RESUME_FROM_FORCED_SUCCESS: case RESUME_FROM_FORCED_SUCCESS:
// TODO: 在这里初始化processInstance的时候可能需要加入startnodelist参数
// find forced-success tasks here // find forced-success tasks here
List<Integer> forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS); List<Integer> forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS);
// deal with sub_process nodes // deal with sub_process nodes
List<Integer> failedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS); // List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS);
List<Integer> toleranceSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS); // List<Integer> toleranceSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS);
List<Integer> killedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS); // List<Integer> killedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS);
//
// failedSubList.addAll(toleranceSubList);
// failedSubList.addAll(killedSubList);
List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(),
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
failedSubList.addAll(toleranceSubList);
failedSubList.addAll(killedSubList);
for (int i = 0; i < failedSubList.size(); i++) { for (int i = 0; i < failedSubList.size(); i++) {
List<Integer> tmpResultList = this.findTaskIdBySubProcessTaskIdAndState(failedSubList.get(i), ExecutionStatus.FORCED_SUCCESS); List<Integer> tmpResultList = this.findTaskIdInSubProcessByStatusAndType(failedSubList.get(i),
new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS},
null);
// if there is forced success in the sub_process // if there is forced success in the sub_process
if (tmpResultList != null && tmpResultList.size() > 0) { if (tmpResultList != null && tmpResultList.size() > 0) {
forcedSuccessList.add(failedSubList.get(i)); forcedSuccessList.add(failedSubList.get(i));
@ -1281,23 +1284,36 @@ public class ProcessService {
/** /**
* get id list by task state and type * get id list by task state and type
* @param instanceId process instance id * @param processInstanceId process instance id
* @param state task instance state * @param states task instance state array
* @param taskType task type * @param taskType task type
* @return task instance id list * @return task instance id list
*/ */
public List<Integer> findTaskIdByInstanceStateAndType(int instanceId, ExecutionStatus state, TaskType taskType){ public List<Integer> findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType){
return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString()); int[] stateArray = new int[states.length];
for (int i = 0; i < states.length; i++) {
stateArray[i] = states[i].ordinal();
}
return taskInstanceMapper.queryTaskByPIdAndStatusAndType(processInstanceId, stateArray, taskType.toString());
} }
/** /**
* get tasks in sub_process by sub_process task id and state * get tasks in sub_process by sub_process task id and state and type
* @param taskId task id * if param type is null, it queries all types
* @param state task instance state * @param taskId task instance id
* @param states task instance state array
* @param taskType task type
* @return task instance id list * @return task instance id list
*/ */
public List<Integer> findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){ public List<Integer> findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType){
return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal()); int[] stateArray = new int[states.length];
for (int i = 0; i < states.length; i++) {
stateArray[i] = states[i].ordinal();
}
if (taskType == null) {
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, stateArray, null);
}
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, stateArray, taskType.toString());
} }
/** /**

Loading…
Cancel
Save