Browse Source

make sure all failed task will save in errorTaskMap (#12424)

3.1.1-release
Yann Ann 2 years ago committed by zhuangchong
parent
commit
35608becac
  1. 23
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -392,13 +392,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@ -422,7 +422,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* release task group
*
* @param taskInstance
*/
public void releaseTaskGroup(TaskInstance taskInstance) {
logger.info("Release task group");
@ -449,7 +448,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* crate new task instance to retry, different objects from the original
*
* @param taskInstance
*/
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) {
@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
private boolean needComplementProcess() {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
return true;
}
return false;
return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess();
}
/**
@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* clone a new taskInstance for retry and reset some logic fields
*
* @return
* @return taskInstance
*/
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@ -1091,7 +1086,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* clone a new taskInstance for tolerant and reset some logic fields
*
* @return
* @return taskInstance
*/
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@ -1111,9 +1106,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* new a taskInstance
*
* @param processInstance
* @param taskNode
* @return
* @param processInstance process instance
* @param taskNode task node
* @return task instance
*/
public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = new TaskInstance();
@ -1431,9 +1426,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
long taskCode = Long.parseLong(dependNodeName);
Integer taskInstanceId = completeTaskMap.get(taskCode);
TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.isFailure()) {
return false;
}
return !depTaskState.isFailure();
}
return true;
}

Loading…
Cancel
Save