From ff9bc806ac04d07ff0581e7a88514f943310d2c9 Mon Sep 17 00:00:00 2001 From: wind Date: Mon, 6 Dec 2021 18:20:56 +0800 Subject: [PATCH] [Bug-7206] [MasterServer] fix process isntance always running when task timeout (#7207) * fix timeout * add task timeout map to avoid repeated timeout event * split task check list for retry and timeout Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/MasterSchedulerService.java | 11 +- .../runner/StateWheelExecuteThread.java | 101 ++++++++++-------- .../master/runner/WorkflowExecuteThread.java | 45 ++++++-- .../master/WorkflowExecuteThreadTest.java | 3 +- 4 files changed, 103 insertions(+), 57 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 00eab7876c..4b48e68cd1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; @@ -41,7 +40,6 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -117,10 +115,15 @@ public class MasterSchedulerService extends Thread { ConcurrentHashMap processTimeoutCheckList = new ConcurrentHashMap<>(); /** - * task time out checkout list + * task time out check list */ ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); + /** + * task retry check list + */ + ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); + private StateWheelExecuteThread stateWheelExecuteThread; /** @@ -134,6 +137,7 @@ public class MasterSchedulerService extends Thread { stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, taskTimeoutCheckList, + taskRetryCheckList, this.processInstanceExecCacheManager, masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } @@ -209,6 +213,7 @@ public class MasterSchedulerService extends Thread { , processAlertManager , masterConfig , taskTimeoutCheckList + , taskRetryCheckList , taskProcessorFactory); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 84db5bcb10..d697ab127c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -43,18 +43,21 @@ public class StateWheelExecuteThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); - ConcurrentHashMap processInstanceCheckList; - ConcurrentHashMap taskInstanceCheckList; + private ConcurrentHashMap processInstanceTimeoutCheckList; + private ConcurrentHashMap taskInstanceTimeoutCheckList; + private ConcurrentHashMap taskInstanceRetryCheckList; private ProcessInstanceExecCacheManager processInstanceExecCacheManager; private int stateCheckIntervalSecs; - public StateWheelExecuteThread(ConcurrentHashMap processInstances, - ConcurrentHashMap taskInstances, + public StateWheelExecuteThread(ConcurrentHashMap processInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceRetryCheckList, ProcessInstanceExecCacheManager processInstanceExecCacheManager, int stateCheckIntervalSecs) { - this.processInstanceCheckList = processInstances; - this.taskInstanceCheckList = taskInstances; + this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; + this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; + this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.stateCheckIntervalSecs = stateCheckIntervalSecs; } @@ -65,8 +68,9 @@ public class StateWheelExecuteThread extends Thread { logger.info("state wheel thread start"); while (Stopper.isRunning()) { try { - checkProcess(); - checkTask(); + checkTask4Timeout(); + checkTask4Retry(); + checkProcess4Timeout(); } catch (Exception e) { logger.error("state wheel thread check error:", e); } @@ -74,85 +78,96 @@ public class StateWheelExecuteThread extends Thread { } } - public boolean addProcess(ProcessInstance processInstance) { - this.processInstanceCheckList.put(processInstance.getId(), processInstance); - return true; + public void addProcess4TimeoutCheck(ProcessInstance processInstance) { + this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance); } - public boolean addTask(TaskInstance taskInstance) { - this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance); - return true; + public void addTask4TimeoutCheck(TaskInstance taskInstance) { + this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } + + public void addTask4RetryCheck(TaskInstance taskInstance) { + this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance); } - private void checkTask() { - if (taskInstanceCheckList.isEmpty()) { + public void checkTask4Timeout() { + if (taskInstanceTimeoutCheckList.isEmpty()) { return; } - - for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) { + for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) { if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 >= timeRemain && processTimeout(taskInstance)) { - taskInstanceCheckList.remove(taskInstance.getId()); + if (0 >= timeRemain) { + addTaskTimeoutEvent(taskInstance); + taskInstanceTimeoutCheckList.remove(taskInstance.getId()); } } + } + } + + private void checkTask4Retry() { + if (taskInstanceRetryCheckList.isEmpty()) { + return; + } + + for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) { if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) { - processDependCheck(taskInstance); - taskInstanceCheckList.remove(taskInstance.getId()); + addTaskStateChangeEvent(taskInstance); + taskInstanceRetryCheckList.remove(taskInstance.getId()); } if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { - processDependCheck(taskInstance); + addTaskStateChangeEvent(taskInstance); } } } - private void checkProcess() { - if (processInstanceCheckList.isEmpty()) { + private void checkProcess4Timeout() { + if (processInstanceTimeoutCheckList.isEmpty()) { return; } - for (ProcessInstance processInstance : this.processInstanceCheckList.values()) { + for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) { long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 <= timeRemain && processTimeout(processInstance)) { - processInstanceCheckList.remove(processInstance.getId()); + if (0 >= timeRemain) { + addProcessTimeoutEvent(processInstance); + processInstanceTimeoutCheckList.remove(processInstance.getId()); } } } - private void putEvent(StateEvent stateEvent) { - - if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { - return; - } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); - workflowExecuteThread.addStateEvent(stateEvent); - } - - private boolean processDependCheck(TaskInstance taskInstance) { + private boolean addTaskStateChangeEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - putEvent(stateEvent); + addEvent(stateEvent); return true; } - private boolean processTimeout(TaskInstance taskInstance) { + private boolean addTaskTimeoutEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_TIMEOUT); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); - putEvent(stateEvent); + addEvent(stateEvent); return true; } - private boolean processTimeout(ProcessInstance processInstance) { + private boolean addProcessTimeoutEvent(ProcessInstance processInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.PROCESS_TIMEOUT); stateEvent.setProcessInstanceId(processInstance.getId()); - putEvent(stateEvent); + addEvent(stateEvent); return true; } + private void addEvent(StateEvent stateEvent) { + if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { + return; + } + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + workflowExecuteThread.addStateEvent(stateEvent); + } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 8a594093be..9116ce5771 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -205,6 +205,11 @@ public class WorkflowExecuteThread implements Runnable { */ private ConcurrentHashMap taskTimeoutCheckList; + /** + * task retry check list + */ + private ConcurrentHashMap taskRetryCheckList; + /** * state event queue */ @@ -232,14 +237,15 @@ public class WorkflowExecuteThread implements Runnable { , ProcessAlertManager processAlertManager , MasterConfig masterConfig , ConcurrentHashMap taskTimeoutCheckList + , ConcurrentHashMap taskRetryCheckList , TaskProcessorFactory taskProcessorFactory) { this.processService = processService; - this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.taskTimeoutCheckList = taskTimeoutCheckList; + this.taskRetryCheckList = taskRetryCheckList; this.taskProcessorFactory = taskProcessorFactory; } @@ -378,11 +384,10 @@ public class WorkflowExecuteThread implements Runnable { if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); taskProcessor.action(TaskAction.TIMEOUT); - return false; } else { processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine()); - return true; } + return true; } private boolean processTimeout() { @@ -415,7 +420,7 @@ public class WorkflowExecuteThread implements Runnable { this.stateEvents.add(nextEvent); } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - this.processService.sendStartTask2Master(processInstance,nextTaskInstance.getId(), + this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } @@ -450,6 +455,7 @@ public class WorkflowExecuteThread implements Runnable { task.getMaxRetryTimes(), task.getRetryInterval()); this.addTimeoutCheck(task); + this.addRetryCheck(task); } else { submitStandByTask(); } @@ -459,6 +465,7 @@ public class WorkflowExecuteThread implements Runnable { completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); activeTaskProcessorMaps.remove(task.getId()); taskTimeoutCheckList.remove(task.getId()); + taskRetryCheckList.remove(task.getId()); if (task.getState().typeIsSuccess()) { processInstance.setVarPool(task.getVarPool()); @@ -826,6 +833,7 @@ public class WorkflowExecuteThread implements Runnable { taskProcessor.run(); addTimeoutCheck(taskInstance); + addRetryCheck(taskInstance); if (taskProcessor.taskState().typeIsFinished()) { StateEvent stateEvent = new StateEvent(); @@ -867,13 +875,30 @@ public class WorkflowExecuteThread implements Runnable { logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); return; } - - if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) { + if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } else { - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } + } + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } + } + + private void addRetryCheck(TaskInstance taskInstance) { + if (taskRetryCheckList.containsKey(taskInstance.getId())) { + return; + } + TaskDefinition taskDefinition = taskInstance.getTaskDefine(); + if (taskDefinition == null) { + logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + return; + } + + if (taskInstance.taskCanRetry()) { + this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); + } + + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 48c1f84a13..936b70f74d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -108,7 +108,8 @@ public class WorkflowExecuteThreadTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskProcessorFactory)); + ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList, taskProcessorFactory)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true);