Browse Source

从强制成功处启动-masterServer部分

1. 添加两个查询(mapper.java和mapper.xml)
2. 添加ProcessService中constructProcessInstance中对应的部分
pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
5e9d664b80
  1. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  2. 14
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  3. 47
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -69,4 +69,11 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime
);
List<Integer> queryTaskByPIdAndStateAndType(@Param("processInstanceId") Integer processInstanceId,
@Param("state") int state,
@Param("taskType") String taskType);
List<Integer> queryTasksBySubProcessTaskIdAndState(@Param("subProcessTaskId") Integer subProcessTaskId,
@Param("state") Integer state);
}

14
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -129,4 +129,18 @@
</if>
order by instance.start_time desc
</select>
<select id="queryTasksByPIdAndStatesAndType">
select id from t_ds_task_instance
where process_instance_id = #{processInstanceId}
and task_type = #{taskType}
and state = #{state}
and flag = 1
</select>
<select id="queryTasksBySubProcessTaskIdAndState">
select id from t_ds_task_instance
where process_instance_id =
(select process_instance_id in t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId})
and state = #{state}
and flag = 1
</select>
</mapper>

47
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -690,6 +690,38 @@ public class ProcessService {
initComplementDataParam(processDefinition, processInstance, cmdParam);
break;
case SCHEDULER:
break;
case RESUME_FROM_FORCED_SUCCESS:
// TODO: 在这里初始化processInstance的时候可能需要加入startnodelist参数
// find forced-success tasks here
List<Integer> forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS);
// deal with sub_process nodes
List<Integer> failedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS);
List<Integer> toleranceSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS);
List<Integer> killedSubList = this.findTaskIdByInstanceStateAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS);
failedSubList.addAll(toleranceSubList);
failedSubList.addAll(killedSubList);
for (int i = 0; i < failedSubList.size(); i++) {
List<Integer> tmpResultList = this.findTaskIdBySubProcessTaskIdAndState(failedSubList.get(i), ExecutionStatus.FORCED_SUCCESS);
// if there is forced success in the sub_process
if (tmpResultList != null && tmpResultList.size() > 0) {
forcedSuccessList.add(failedSubList.get(i));
// change sub_process task's state into submitted_success
TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i));
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
}
}
// set resume node list
cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING);
cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(forcedSuccessList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
default:
break;
@ -1247,6 +1279,21 @@ public class ProcessService {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
}
/**
* get id list by task state and type
* @param instanceId process instance id
* @param state task instance state
* @param taskType task type
* @return task instance id list
*/
public List<Integer> findTaskIdByInstanceStateAndType(int instanceId, ExecutionStatus state, TaskType taskType){
return taskInstanceMapper.queryTaskByPIdAndStateAndType(instanceId, state.ordinal(), taskType.toString());
}
public List<Integer> findTaskIdBySubProcessTaskIdAndState(int taskId, ExecutionStatus state){
return taskInstanceMapper.queryTasksBySubProcessTaskIdAndState(taskId, state.ordinal());
}
/**
* find valid task list by process definition id
* @param processInstanceId processInstanceId

Loading…
Cancel
Save