diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index b37f6f66c2..4b039f1a93 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -1075,6 +1075,7 @@ public class MasterExecThread implements Runnable { // updateProcessInstance completed task status // failure priority is higher than pause // if a task fails, other suspended tasks need to be reset kill + // check if there exists forced success nodes in errorTaskList if(errorTaskList.size() > 0){ for(Map.Entry entry: completeTaskList.entrySet()) { TaskInstance completeTask = entry.getValue(); @@ -1084,6 +1085,23 @@ public class MasterExecThread implements Runnable { processService.updateTaskInstance(completeTask); } } + + for(Map.Entry entry: errorTaskList.entrySet()) { + TaskInstance errorTask = entry.getValue(); + TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId()); + if (currentTask == null) { + continue; + } + // for nodes that have been forced success + if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + // update state in this thread and remove from errorTaskList + errorTask.setState(currentTask.getState()); + logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName()); + errorTaskList.remove(errorTask.getName()); + // submit post nodes + submitPostNode(errorTask.getName()); + } + } } if(canSubmitTaskToQueue()){ submitStandByTask(); @@ -1177,6 +1195,20 @@ public class MasterExecThread implements Runnable { private void submitStandByTask(){ for(Map.Entry entry: readyToSubmitTaskList.entrySet()) { TaskInstance task = entry.getValue(); + // stop tasks which is retrying if forced success happens + if (task.taskCanRetry()) { + TaskInstance tmpTask = processService.findTaskInstanceById(task.getId()); + if (tmpTask != null) { + if (tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + task.setState(tmpTask.getState()); + logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + removeTaskFromStandbyList(task); + completeTaskList.put(task.getName(), task); + submitPostNode(task.getName()); + continue; + } + } + } DependResult dependResult = getDependResultForTask(task); if(DependResult.SUCCESS == dependResult){ if(retryTaskIntervalOverTime(task)){