Browse Source

make sure all failed task will save in errorTaskMap (#12424)

3.2.0-release
Yann Ann 2 years ago committed by GitHub
parent
commit
38b643f69b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -391,13 +391,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
retryTaskInstance(taskInstance); retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) { } else if (taskInstance.getState().isFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE // There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()), Long.toString(taskInstance.getTaskCode()),
dag)) { dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode())); submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else { } else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) { if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks(); killAllTasks();
} }
@ -421,7 +421,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* release task group * release task group
* *
* @param taskInstance
*/ */
public void releaseTaskGroup(TaskInstance taskInstance) { public void releaseTaskGroup(TaskInstance taskInstance) {
logger.info("Release task group"); logger.info("Release task group");
@ -448,7 +447,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* crate new task instance to retry, different objects from the original * crate new task instance to retry, different objects from the original
* *
* @param taskInstance
*/ */
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) { if (!taskInstance.taskCanRetry()) {
@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
private boolean needComplementProcess() { private boolean needComplementProcess() {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess();
return true;
}
return false;
} }
/** /**
@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* clone a new taskInstance for retry and reset some logic fields * clone a new taskInstance for retry and reset some logic fields
* *
* @return * @return taskInstance
*/ */
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@ -1097,7 +1092,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* clone a new taskInstance for tolerant and reset some logic fields * clone a new taskInstance for tolerant and reset some logic fields
* *
* @return * @return taskInstance
*/ */
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
@ -1119,9 +1114,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* new a taskInstance * new a taskInstance
* *
* @param processInstance * @param processInstance process instance
* @param taskNode * @param taskNode task node
* @return * @return task instance
*/ */
public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
@ -1445,9 +1440,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
long taskCode = Long.parseLong(dependNodeName); long taskCode = Long.parseLong(dependNodeName);
Integer taskInstanceId = completeTaskMap.get(taskCode); Integer taskInstanceId = completeTaskMap.get(taskCode);
TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.isFailure()) { return !depTaskState.isFailure();
return false;
}
} }
return true; return true;
} }

Loading…
Cancel
Save