Browse Source

[Fix-10785] Fix state event handle error will not retry (#10786)

* Fix state event handle error will not retry

* Use state event handler to deal with the event

(cherry picked from commit 67d14fb7b3)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
4fc9bce444
  1. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -281,14 +281,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
try { try {
stateEvent = this.stateEvents.peek(); stateEvent = this.stateEvents.peek();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId()); stateEvent.getTaskInstanceId());
// if state handle success then will remove this state, otherwise will retry this state next time. // if state handle success then will remove this state, otherwise will retry this state next time.
// The state should always handle success except database error. // The state should always handle success except database error.
checkProcessInstance(stateEvent); checkProcessInstance(stateEvent);
StateEventHandler stateEventHandler StateEventHandler stateEventHandler =
= StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
if (stateEventHandler.handleStateEvent(this, stateEvent)) { if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent); this.stateEvents.remove(stateEvent);
} }
@ -298,14 +298,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) { } catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}", logger.error("State event handle error, will retry this event: {}",
stateEvent, stateEvent,
stateEventHandleException); stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
logger.error("State event handle error, get a unknown exception, will retry this event: {}", logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent, stateEvent,
e); e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally { } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@ -350,7 +350,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH); taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true; return true;
} }
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
@ -454,8 +454,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) { if (newTaskInstance == null) {
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
taskInstance.getTaskCode(), taskInstance.getTaskCode(),
taskInstance.getId()); taskInstance.getId());
return; return;
} }
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@ -787,8 +787,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.clear(); errorTaskMap.clear();
if (!isNewProcessInstance()) { if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList List<TaskInstance> validTaskInstanceList =
= processService.findValidTaskListByProcessId(processInstance.getId()); processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) { for (TaskInstance task : validTaskInstanceList) {
if (validTaskMap.containsKey(task.getTaskCode())) { if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@ -799,7 +799,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
continue; continue;
} }
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
task.getTaskCode()); task.getTaskCode());
} }
validTaskMap.put(task.getTaskCode(), task.getId()); validTaskMap.put(task.getTaskCode(), task.getId());
@ -1193,10 +1193,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
private void submitPostNode(String parentNodeCode) throws StateEventHandleException { private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, Set<String> submitTaskNodeList =
skipTaskNodeMap, DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
dag,
getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>(); List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) { for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode); TaskNode taskNodeObject = dag.getNode(taskNode);
@ -1859,7 +1857,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return waitToRetryTaskInstanceMap; return waitToRetryTaskInstanceMap;
} }
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) { private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param // get start params from command param
Map<String, String> startParamMap = new HashMap<>(); Map<String, String> startParamMap = new HashMap<>();

Loading…
Cancel
Save