diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 7a15a43d2b..16604662ac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -281,14 +281,14 @@ public class WorkflowExecuteRunnable implements Callable { try { stateEvent = this.stateEvents.peek(); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); + stateEvent.getTaskInstanceId()); // if state handle success then will remove this state, otherwise will retry this state next time. // The state should always handle success except database error. checkProcessInstance(stateEvent); - StateEventHandler stateEventHandler - = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) - .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); + StateEventHandler stateEventHandler = + StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) + .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } @@ -298,14 +298,14 @@ public class WorkflowExecuteRunnable implements Callable { ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", - stateEvent, - stateEventHandleException); + stateEvent, + stateEventHandleException); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", - stateEvent, - e); + stateEvent, + e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); @@ -350,7 +350,7 @@ public class WorkflowExecuteRunnable implements Callable { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { @@ -454,8 +454,8 @@ public class WorkflowExecuteRunnable implements Callable { TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", - taskInstance.getTaskCode(), - taskInstance.getId()); + taskInstance.getTaskCode(), + taskInstance.getId()); return; } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); @@ -787,8 +787,8 @@ public class WorkflowExecuteRunnable implements Callable { errorTaskMap.clear(); if (!isNewProcessInstance()) { - List validTaskInstanceList - = processService.findValidTaskListByProcessId(processInstance.getId()); + List validTaskInstanceList = + processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { if (validTaskMap.containsKey(task.getTaskCode())) { int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); @@ -799,7 +799,7 @@ public class WorkflowExecuteRunnable implements Callable { continue; } logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", - task.getTaskCode()); + task.getTaskCode()); } validTaskMap.put(task.getTaskCode(), task.getId()); @@ -1193,10 +1193,8 @@ public class WorkflowExecuteRunnable implements Callable { } private void submitPostNode(String parentNodeCode) throws StateEventHandleException { - Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, - skipTaskNodeMap, - dag, - getCompleteTaskInstanceMap()); + Set submitTaskNodeList = + DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); @@ -1859,7 +1857,6 @@ public class WorkflowExecuteRunnable implements Callable { return waitToRetryTaskInstanceMap; } - private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { // get start params from command param Map startParamMap = new HashMap<>();