diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 5576f13050..5c569b4744 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -73,11 +73,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread { */ private final ConcurrentLinkedQueue taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>(); - /** - * task state check list - */ - private final ConcurrentLinkedQueue taskInstanceStateCheckList = new ConcurrentLinkedQueue<>(); - @Autowired private MasterConfig masterConfig; @@ -104,7 +99,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread { try { checkTask4Timeout(); checkTask4Retry(); - checkTask4State(); checkProcess4Timeout(); } catch (Exception e) { log.error("state wheel thread check error:", e); @@ -214,30 +208,10 @@ public class StateWheelExecuteThread extends BaseDaemonThread { log.info("remove task instance from retry check list"); } - public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { - log.info("Adding task instance into state check list"); - TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceStateCheckList.contains(taskInstanceKey)) { - log.warn("Task instance is already in state check list"); - return; - } - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - taskInstanceStateCheckList.add(taskInstanceKey); - log.info("Added task instance into state check list"); - } - } - - public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { - TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - taskInstanceStateCheckList.remove(taskInstanceKey); - log.info("Removed task instance from state check list"); - } - public void clearAllTasks() { processInstanceTimeoutCheckList.clear(); taskInstanceTimeoutCheckList.clear(); taskInstanceRetryCheckList.clear(); - taskInstanceStateCheckList.clear(); } private void checkTask4Timeout() { @@ -352,56 +326,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } } - private void checkTask4State() { - if (taskInstanceStateCheckList.isEmpty()) { - return; - } - for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) { - int processInstanceId = taskInstanceKey.getProcessInstanceId(); - long taskCode = taskInstanceKey.getTaskCode(); - - try { - LogUtils.setTaskInstanceIdMDC(processInstanceId); - WorkflowExecuteRunnable workflowExecuteThread = - processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread == null) { - log.warn( - "Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task"); - taskInstanceStateCheckList.remove(taskInstanceKey); - continue; - } - Optional taskInstanceOptional = - workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); - if (!taskInstanceOptional.isPresent()) { - log.warn( - "Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event"); - taskInstanceStateCheckList.remove(taskInstanceKey); - continue; - } - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState().isFinished()) { - continue; - } - addTaskStateChangeEvent(taskInstance); - } catch (Exception ex) { - log.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex); - } finally { - LogUtils.removeWorkflowInstanceIdMDC(); - } - } - } - - private void addTaskStateChangeEvent(TaskInstance taskInstance) { - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(taskInstance.getProcessInstanceId()) - .taskInstanceId(taskInstance.getId()) - .taskCode(taskInstance.getTaskCode()) - .type(StateEventType.TASK_STATE_CHANGE) - .status(TaskExecutionStatus.RUNNING_EXECUTION) - .build(); - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } - private void addProcessStopEvent(ProcessInstance processInstance) { WorkflowStateEvent stateEvent = WorkflowStateEvent.builder() .processInstanceId(processInstance.getId()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index c7b7a37bac..6bc43ce69f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -442,7 +442,6 @@ public class WorkflowExecuteRunnable implements Callable { taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); - stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance); if (taskInstance.getState().isSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); @@ -1043,7 +1042,6 @@ public class WorkflowExecuteRunnable implements Callable { taskExecuteRunnable.dispatch(); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); - stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance); return true; } } catch (Exception e) {