Browse Source

改变原有的dag构建参数

1. 这样的话可以完整的构建整个dag,恢复之前的上下文;也没有任务可能重复执行的担忧
2. 不需要额外去处理process执行完之后的状态(主要是部分与整体的原因)
3. RecoverNodeIdList也不会重复
pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
18ae56bb6a
  1. 51
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

51
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -691,40 +691,29 @@ public class ProcessService {
case SCHEDULER: case SCHEDULER:
break; break;
case RESUME_FROM_FORCED_SUCCESS: case RESUME_FROM_FORCED_SUCCESS:
// find forced-success tasks here
List<Integer> forcedSuccessList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.FORCED_SUCCESS);
// deal with sub_process nodes
// List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.FAILURE, TaskType.SUB_PROCESS);
// List<Integer> toleranceSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.NEED_FAULT_TOLERANCE, TaskType.SUB_PROCESS);
// List<Integer> killedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), ExecutionStatus.KILL, TaskType.SUB_PROCESS);
//
// failedSubList.addAll(toleranceSubList);
// failedSubList.addAll(killedSubList);
List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(),
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS); TaskType.SUB_PROCESS);
for (int i = 0; i < failedSubList.size(); i++) { for (int i = 0; i < failedSubList.size(); i++) {
List<Integer> tmpResultList = this.findTaskIdInSubProcessByStatusAndType(failedSubList.get(i), // if there exists forced success in the sub_process
new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, if (haveForcedSuccessInSubProcess(failedSubList.get(i))) {
null);
// if there is forced success in the sub_process
if (tmpResultList != null && tmpResultList.size() > 0) {
forcedSuccessList.add(failedSubList.get(i));
// change sub_process task's state into submitted_success // change sub_process task's state into submitted_success
TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i)); TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i));
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance); updateTaskInstance(taskInstance);
} }
} }
/**
// set resume node list * set resume node list to null
* 1. we can have a complete dag in the ExecThread so that it can restore the previous context
* 2. each time the operation is done the state of process will be reasonable as usual
*/
cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING); cmdParam.remove(CMDPARAM_RECOVERY_START_NODE_STRING);
cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(forcedSuccessList))); String.join(Constants.COMMA, convertIntListToString(null)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1); processInstance.setRunTimes(runTime + 1);
break; break;
default: default:
break; break;
@ -733,6 +722,30 @@ public class ProcessService {
return processInstance; return processInstance;
} }
/**
* recursively check if a sub process node contains forced success node
* @param taskInstanceId task instance id
* @return true or false
*/
public boolean haveForcedSuccessInSubProcess(int taskInstanceId) {
List<Integer> forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS},
null);
if (forcedSuccessList != null && forcedSuccessList.size() > 0) {
return true;
}
List<Integer> childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
for (Integer child : childSubList) {
if (haveForcedSuccessInSubProcess(child)) {
return true;
}
}
return false;
}
/** /**
* return complement data if the process start with complement data * return complement data if the process start with complement data
* @param processInstance processInstance * @param processInstance processInstance

Loading…
Cancel
Save