Browse Source

[Fix-9065] [master] when task submit failed, remove from queue and add state event to thread (#9186)

* [Fix-9065] [master] when task submit failed, remove from queue and add state event to thread

This closes #9065

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

yes, your idea is better

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>

* [Bug-9065] [master] update logger info

Co-authored-by: guoshupei <guoshupei@lixiang.com>
Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.0.0/version-upgrade
guoshupei 3 years ago committed by GitHub
parent
commit
3e5781b6e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -1637,7 +1637,9 @@ public class WorkflowExecuteThread {
stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
this.processStateChangeHandler(stateEvent); // this.processStateChangeHandler(stateEvent);
// replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks
this.stateEvents.add(stateEvent);
} }
} }
@ -1800,6 +1802,11 @@ public class WorkflowExecuteThread {
TaskInstance taskInstance = submitTaskExec(task); TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) { if (taskInstance == null) {
this.taskFailedSubmit = true; this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
} else { } else {
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
} }

Loading…
Cancel
Save