Browse Source

[Fix-13272] [Master] resume failure processInstance state remain failure (#13425)

* fix 13272

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
2.0.8-release
JinYong Li 2 years ago committed by GitHub
parent
commit
e4b460aee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 44
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -1145,16 +1145,9 @@ public class WorkflowExecuteThread implements Runnable {
} else {
if (processInstance.getCommandType() == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS
|| processInstance.getCommandType() == CommandType.RECOVER_SUSPENDED_PROCESS) {
List<Integer> failedList = processService.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
if (!failedList.isEmpty()) {
return true;
}
List<Integer> toleranceList = processService.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
if (!toleranceList.isEmpty()) {
return true;
}
List<Integer> killedList = processService.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
if (!killedList.isEmpty()) {
List<Integer> failureTaskIds = processService.findLastTaskIdByStateList(processInstance.getId(),
Lists.newArrayList(ExecutionStatus.FAILURE, ExecutionStatus.NEED_FAULT_TOLERANCE, ExecutionStatus.KILL));
if (!failureTaskIds.isEmpty()) {
return true;
}
}

44
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -809,18 +809,14 @@ public class ProcessService {
break;
case START_FAILURE_TASK_PROCESS:
// find failed tasks and init these tasks
List<Integer> failedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FAILURE);
List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE);
List<Integer> killedList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.KILL);
List<Integer> needReSubmitTasks = this.findLastTaskIdByStateList(processInstance.getId(),
Lists.newArrayList(ExecutionStatus.FAILURE, ExecutionStatus.NEED_FAULT_TOLERANCE, ExecutionStatus.KILL));
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
failedList.addAll(killedList);
failedList.addAll(toleranceList);
for (Integer taskId : failedList) {
for (Integer taskId : needReSubmitTasks) {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList)));
String.join(Constants.COMMA, convertIntListToString(needReSubmitTasks)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@ -831,15 +827,12 @@ public class ProcessService {
case RECOVER_SUSPENDED_PROCESS:
// find pause tasks and init task's state
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for (Integer taskId : suspendedNodeList) {
List<Integer> needReSubmitNodeList = this.findLastTaskIdByStateList(processInstance.getId(), Lists.newArrayList(ExecutionStatus.PAUSE, ExecutionStatus.KILL));
for (Integer taskId : needReSubmitNodeList) {
// initialize the pause state
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(needReSubmitNodeList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@ -1624,6 +1617,29 @@ public class ProcessService {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
}
/**
* get id list by task state list
*
* @param instanceId instanceId
* @param stateList stateList
* @return task instance ids
*/
public List<Integer> findLastTaskIdByStateList(int instanceId, List<ExecutionStatus> stateList) {
List<TaskInstance> validTaskInstanceList = this.findValidTaskListByProcessId(instanceId);
Map<Long, TaskInstance> validTaskInstanceMap = new HashMap<>();
for (TaskInstance instance : validTaskInstanceList) {
validTaskInstanceMap.compute(instance.getTaskCode(), (k, v) -> {
if (v == null || v.getId() < instance.getId()) {
return instance;
} else {
return v;
}
});
}
return validTaskInstanceMap.values().stream().filter(t -> stateList.contains(t.getState()))
.map(TaskInstance::getId).collect(Collectors.toList());
}
/**
* find valid task list by process definition id
*

Loading…
Cancel
Save