Browse Source

增加强制成功对运行中process的影响

1. 失败任务强制成功后会继续submit后续结点
2. 失败重试的节点强制成功后会继续submit后面结点,并且停止retry
3. 对于失败的sub-process如果其中的结点强制成功了,在parent-process中不会有任何影响,只能等到结束后“从强制成功处继续执行”。
pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
48afb2e2f8
  1. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -1075,6 +1075,7 @@ public class MasterExecThread implements Runnable {
// updateProcessInstance completed task status
// failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill
// check if there exists forced success nodes in errorTaskList
if(errorTaskList.size() > 0){
for(Map.Entry<String, TaskInstance> entry: completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue();
@ -1084,6 +1085,23 @@ public class MasterExecThread implements Runnable {
processService.updateTaskInstance(completeTask);
}
}
for(Map.Entry<String, TaskInstance> entry: errorTaskList.entrySet()) {
TaskInstance errorTask = entry.getValue();
TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId());
if (currentTask == null) {
continue;
}
// for nodes that have been forced success
if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
// update state in this thread and remove from errorTaskList
errorTask.setState(currentTask.getState());
logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName());
errorTaskList.remove(errorTask.getName());
// submit post nodes
submitPostNode(errorTask.getName());
}
}
}
if(canSubmitTaskToQueue()){
submitStandByTask();
@ -1177,6 +1195,20 @@ public class MasterExecThread implements Runnable {
private void submitStandByTask(){
for(Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) {
TaskInstance task = entry.getValue();
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance tmpTask = processService.findTaskInstanceById(task.getId());
if (tmpTask != null) {
if (tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(tmpTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;
}
}
}
DependResult dependResult = getDependResultForTask(task);
if(DependResult.SUCCESS == dependResult){
if(retryTaskIntervalOverTime(task)){

Loading…
Cancel
Save