diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 3abfdd2ac2..f19dfbe447 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -250,18 +250,29 @@ public class StateWheelExecuteThread extends Thread { if (taskInstanceRetryCheckList.isEmpty()) { return; } + for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } + TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); + ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + + if (processInstance.getState() == ExecutionStatus.READY_STOP) { + addProcessStopEvent(processInstance); + taskInstanceRetryCheckList.remove(taskInstanceKey); + break; + } + if (taskInstance == null) { logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); @@ -320,6 +331,14 @@ public class StateWheelExecuteThread extends Thread { workflowExecuteThreadPool.submitStateEvent(stateEvent); } + private void addProcessStopEvent(ProcessInstance processInstance) { + StateEvent stateEvent = new StateEvent(); + stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(ExecutionStatus.STOP); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + private void addTaskRetryEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_RETRY); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 2bc16a3f9f..676bb030d8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -441,7 +441,7 @@ public class WorkflowExecuteThread { processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); submitPostNode(Long.toString(taskInstance.getTaskCode())); - } else if (taskInstance.taskCanRetry()) { + } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { // retry task retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { @@ -490,7 +490,7 @@ public class WorkflowExecuteThread { if (!taskInstance.taskCanRetry()) { return; } - TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); + TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); return; @@ -577,6 +577,7 @@ public class WorkflowExecuteThread { logger.error("task instance id null, state event:{}", stateEvent); return false; } + if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) { logger.error("mismatch task instance id, event:{}", stateEvent); return false; @@ -636,6 +637,12 @@ public class WorkflowExecuteThread { private boolean processStateChangeHandler(StateEvent stateEvent) { try { logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus()); + + if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { + this.updateProcessInstanceState(stateEvent); + return true; + } + if (processComplementData()) { return true; } @@ -1441,10 +1448,6 @@ public class WorkflowExecuteThread { // active task and retry task exists return runningState(state); } - // process failure - if (processFailed()) { - return ExecutionStatus.FAILURE; - } // waiting thread if (hasWaitingThreadTask()) { @@ -1460,8 +1463,10 @@ public class WorkflowExecuteThread { if (state == ExecutionStatus.READY_STOP) { List stopList = getCompleteTaskByState(ExecutionStatus.STOP); List killList = getCompleteTaskByState(ExecutionStatus.KILL); + List faillist = getCompleteTaskByState(ExecutionStatus.FAILURE); if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) + || CollectionUtils.isNotEmpty(faillist) || !isComplementEnd()) { return ExecutionStatus.STOP; } else { @@ -1469,6 +1474,11 @@ public class WorkflowExecuteThread { } } + // process failure + if (processFailed()) { + return ExecutionStatus.FAILURE; + } + // success if (state == ExecutionStatus.RUNNING_EXECUTION) { List killTasks = getCompleteTaskByState(ExecutionStatus.KILL); @@ -1534,6 +1544,26 @@ public class WorkflowExecuteThread { } } + /** + * stateEvent's execution status as process instance state + */ + private void updateProcessInstanceState(StateEvent stateEvent) { + ExecutionStatus state = stateEvent.getExecutionStatus(); + if (processInstance.getState() != state) { + logger.info( + "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), processInstance.getName(), + processInstance.getState(), state, + processInstance.getCommandType()); + + processInstance.setState(state); + if (state.typeIsFinished()) { + processInstance.setEndTime(new Date()); + } + processService.updateProcessInstance(processInstance); + } + } + /** * get task dependency result * @@ -1607,6 +1637,11 @@ public class WorkflowExecuteThread { private void killAllTasks() { logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), activeTaskProcessorMaps.size()); + + if (readyToSubmitTaskQueue.size() > 0) { + readyToSubmitTaskQueue.clear(); + } + for (long taskCode : activeTaskProcessorMaps.keySet()) { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode); Integer taskInstanceId = validTaskMap.get(taskCode); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d6251d5400..56906cdc26 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1162,7 +1162,7 @@ public class ProcessService { logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); Thread.sleep(commitInterval); } catch (Exception e) { - logger.error("task commit to mysql failed", e); + logger.error("task commit to db failed", e); } retryTimes += 1; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index b558d42405..7502607bcf 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -104,6 +104,14 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue