From c2d0acd7139e5472da0de0b8e25636396442c0f5 Mon Sep 17 00:00:00 2001 From: wind Date: Mon, 6 Dec 2021 20:40:54 +0800 Subject: [PATCH] fix timeout (#7222) Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/MasterSchedulerService.java | 11 +- .../runner/StateWheelExecuteThread.java | 101 ++++++++++------- .../master/runner/WorkflowExecuteThread.java | 105 ++++++++++++++---- .../master/WorkflowExecuteThreadTest.java | 7 +- 4 files changed, 149 insertions(+), 75 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 803ba09bcc..22ae759bcb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -108,6 +108,11 @@ public class MasterSchedulerService extends Thread { */ ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); + /** + * task retry check list + */ + ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); + /** * key:code-version * value: processDefinition @@ -127,6 +132,7 @@ public class MasterSchedulerService extends Thread { stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, taskTimeoutCheckList, + taskRetryCheckList, this.processInstanceExecMaps, masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } @@ -176,8 +182,6 @@ public class MasterSchedulerService extends Thread { /** * 1. get command by slot * 2. donot handle command if slot is empty - * - * @throws Exception */ private void scheduleProcess() throws Exception { @@ -201,7 +205,8 @@ public class MasterSchedulerService extends Thread { , nettyExecutorManager , processAlertManager , masterConfig - , taskTimeoutCheckList); + , taskTimeoutCheckList + , taskRetryCheckList); this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 50406cc27d..d0e9d94da6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -42,18 +42,21 @@ public class StateWheelExecuteThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); - ConcurrentHashMap processInstanceCheckList; - ConcurrentHashMap taskInstanceCheckList; + private ConcurrentHashMap processInstanceTimeoutCheckList; + private ConcurrentHashMap taskInstanceTimeoutCheckList; + private ConcurrentHashMap taskInstanceRetryCheckList; private ConcurrentHashMap processInstanceExecMaps; private int stateCheckIntervalSecs; - public StateWheelExecuteThread(ConcurrentHashMap processInstances, - ConcurrentHashMap taskInstances, + public StateWheelExecuteThread(ConcurrentHashMap processInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceRetryCheckList, ConcurrentHashMap processInstanceExecMaps, int stateCheckIntervalSecs) { - this.processInstanceCheckList = processInstances; - this.taskInstanceCheckList = taskInstances; + this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; + this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; + this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; this.processInstanceExecMaps = processInstanceExecMaps; this.stateCheckIntervalSecs = stateCheckIntervalSecs; } @@ -64,8 +67,9 @@ public class StateWheelExecuteThread extends Thread { logger.info("state wheel thread start"); while (Stopper.isRunning()) { try { - checkProcess(); - checkTask(); + checkTask4Timeout(); + checkTask4Retry(); + checkProcess4Timeout(); } catch (Exception e) { logger.error("state wheel thread check error:", e); } @@ -73,85 +77,96 @@ public class StateWheelExecuteThread extends Thread { } } - public boolean addProcess(ProcessInstance processInstance) { - this.processInstanceCheckList.put(processInstance.getId(), processInstance); - return true; + public void addProcess4TimeoutCheck(ProcessInstance processInstance) { + this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance); } - public boolean addTask(TaskInstance taskInstance) { - this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance); - return true; + public void addTask4TimeoutCheck(TaskInstance taskInstance) { + this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } + + public void addTask4RetryCheck(TaskInstance taskInstance) { + this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance); } - private void checkTask() { - if (taskInstanceCheckList.isEmpty()) { + public void checkTask4Timeout() { + if (taskInstanceTimeoutCheckList.isEmpty()) { return; } - - for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) { + for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) { if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 >= timeRemain && processTimeout(taskInstance)) { - taskInstanceCheckList.remove(taskInstance.getId()); + if (0 >= timeRemain) { + addTaskTimeoutEvent(taskInstance); + taskInstanceTimeoutCheckList.remove(taskInstance.getId()); } } + } + } + + private void checkTask4Retry() { + if (taskInstanceRetryCheckList.isEmpty()) { + return; + } + + for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) { if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) { - processDependCheck(taskInstance); - taskInstanceCheckList.remove(taskInstance.getId()); + addTaskStateChangeEvent(taskInstance); + taskInstanceRetryCheckList.remove(taskInstance.getId()); } if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { - processDependCheck(taskInstance); + addTaskStateChangeEvent(taskInstance); } } } - private void checkProcess() { - if (processInstanceCheckList.isEmpty()) { + private void checkProcess4Timeout() { + if (processInstanceTimeoutCheckList.isEmpty()) { return; } - for (ProcessInstance processInstance : this.processInstanceCheckList.values()) { + for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) { long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 <= timeRemain && processTimeout(processInstance)) { - processInstanceCheckList.remove(processInstance.getId()); + if (0 >= timeRemain) { + addProcessTimeoutEvent(processInstance); + processInstanceTimeoutCheckList.remove(processInstance.getId()); } } } - private void putEvent(StateEvent stateEvent) { - - if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { - return; - } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); - workflowExecuteThread.addStateEvent(stateEvent); - } - - private boolean processDependCheck(TaskInstance taskInstance) { + private boolean addTaskStateChangeEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - putEvent(stateEvent); + addEvent(stateEvent); return true; } - private boolean processTimeout(TaskInstance taskInstance) { + private boolean addTaskTimeoutEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_TIMEOUT); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); - putEvent(stateEvent); + addEvent(stateEvent); return true; } - private boolean processTimeout(ProcessInstance processInstance) { + private boolean addProcessTimeoutEvent(ProcessInstance processInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.PROCESS_TIMEOUT); stateEvent.setProcessInstanceId(processInstance.getId()); - putEvent(stateEvent); + addEvent(stateEvent); return true; } + private void addEvent(StateEvent stateEvent) { + if (!processInstanceExecMaps.contains(stateEvent.getProcessInstanceId())) { + return; + } + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); + workflowExecuteThread.addStateEvent(stateEvent); + } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 1e45ec4824..53c0bf49d3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -17,13 +17,25 @@ package org.apache.dolphinscheduler.server.master.runner; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Lists; -import com.google.common.collect.Table; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -33,7 +45,13 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.entity.Environment; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; @@ -46,14 +64,29 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.*; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Table; /** * master exec thread,split dag @@ -153,9 +186,13 @@ public class WorkflowExecuteThread implements Runnable { private ConcurrentHashMap taskTimeoutCheckList; + /** + * task retry check list + */ + private ConcurrentHashMap taskRetryCheckList; + /** * start flag, true: start nodes submit completely - * */ private boolean isStart = false; @@ -165,14 +202,14 @@ public class WorkflowExecuteThread implements Runnable { * @param processInstance processInstance * @param processService processService * @param nettyExecutorManager nettyExecutorManager - * @param taskTimeoutCheckList */ public WorkflowExecuteThread(ProcessInstance processInstance , ProcessService processService , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager , MasterConfig masterConfig - , ConcurrentHashMap taskTimeoutCheckList) { + , ConcurrentHashMap taskTimeoutCheckList + , ConcurrentHashMap taskRetryCheckList) { this.processService = processService; this.processInstance = processInstance; @@ -180,6 +217,7 @@ public class WorkflowExecuteThread implements Runnable { this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.taskTimeoutCheckList = taskTimeoutCheckList; + this.taskRetryCheckList = taskRetryCheckList; } @Override @@ -197,7 +235,6 @@ public class WorkflowExecuteThread implements Runnable { /** * the process start nodes are submitted completely. - * @return */ public boolean isStart() { return this.isStart; @@ -296,11 +333,10 @@ public class WorkflowExecuteThread implements Runnable { if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); taskProcessor.action(TaskAction.TIMEOUT); - return false; } else { processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine()); - return true; } + return true; } private boolean processTimeout() { @@ -342,6 +378,7 @@ public class WorkflowExecuteThread implements Runnable { task.getMaxRetryTimes(), task.getRetryInterval()); this.addTimeoutCheck(task); + this.addRetryCheck(task); } else { submitStandByTask(); } @@ -351,6 +388,7 @@ public class WorkflowExecuteThread implements Runnable { completeTaskList.put(Long.toString(task.getTaskCode()), task); activeTaskProcessorMaps.remove(task.getId()); taskTimeoutCheckList.remove(task.getId()); + taskRetryCheckList.remove(task.getId()); if (task.getState().typeIsSuccess()) { processInstance.setVarPool(task.getVarPool()); processService.saveProcessInstance(processInstance); @@ -497,7 +535,7 @@ public class WorkflowExecuteThread implements Runnable { processInstance.getProcessDefinitionVersion()); recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); List taskNodeList = - processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); + processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); forbiddenTaskList.clear(); taskNodeList.forEach(taskNode -> { @@ -585,6 +623,7 @@ public class WorkflowExecuteThread implements Runnable { activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); taskProcessor.run(); addTimeoutCheck(taskInstance); + addRetryCheck(taskInstance); TaskDefinition taskDefinition = processService.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); @@ -635,15 +674,34 @@ public class WorkflowExecuteThread implements Runnable { taskInstance.getTaskDefinitionVersion() ); taskInstance.setTaskDefine(taskDefinition); - if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) { + if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { + this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } else { - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } } } + private void addRetryCheck(TaskInstance taskInstance) { + if (taskRetryCheckList.containsKey(taskInstance.getId())) { + return; + } + TaskDefinition taskDefinition = taskInstance.getTaskDefine(); + if (taskDefinition == null) { + logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + return; + } + + if (taskInstance.taskCanRetry()) { + this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); + } + + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); + } + } + + /** * find task instance in db. * in case submit more than one same name task in the same time. @@ -991,7 +1049,6 @@ public class WorkflowExecuteThread implements Runnable { /** * generate the latest process instance status by the tasks state * - * @param instance * @return process instance execution status */ private ExecutionStatus getProcessInstanceState(ProcessInstance instance) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 3bd66c6a78..bf527d22b3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -34,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; @@ -47,7 +44,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -105,7 +101,8 @@ public class WorkflowExecuteThreadTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList)); + ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true);