diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java index f24b3c1546..7cc806a8a5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java @@ -35,6 +35,8 @@ public class StateEvent { private int taskInstanceId; + private long taskCode; + private int processInstanceId; private String context; @@ -53,6 +55,10 @@ public class StateEvent { return taskInstanceId; } + public long getTaskCode() { + return taskCode; + } + public int getProcessInstanceId() { return processInstanceId; } @@ -73,6 +79,10 @@ public class StateEvent { this.taskInstanceId = taskInstanceId; } + public void setTaskCode(long taskCode) { + this.taskCode = taskCode; + } + public Channel getChannel() { return channel; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index fc795d4b09..c758bc7b4e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -26,6 +26,7 @@ public enum StateEventType { PROCESS_TIMEOUT(2, "process timeout"), TASK_TIMEOUT(3, "task timeout"), WAIT_TASK_GROUP(4, "wait task group"), + TASK_RETRY(5, "task retry") ; StateEventType(int code, String descp) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 976060cd7a..b4e52c0f3d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.commons.lang3.SerializationUtils; + import java.io.Serializable; import java.util.Date; import java.util.Map; @@ -599,7 +601,8 @@ public class TaskInstance implements Serializable { } /** - * determine if you can try again + * determine if a task instance can retry + * if subProcess, * * @return can try result */ @@ -609,10 +612,8 @@ public class TaskInstance implements Serializable { } if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { return true; - } else { - return (this.getState().typeIsFailure() - && this.getRetryTimes() < this.getMaxRetryTimes()); } + return this.getState() == ExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes()); } /** @@ -624,9 +625,7 @@ public class TaskInstance implements Serializable { if (getState() != ExecutionStatus.FAILURE) { return true; } - if (getId() == 0 - || getMaxRetryTimes() == 0 - || getRetryInterval() == 0) { + if (getMaxRetryTimes() == 0 || getRetryInterval() == 0) { return true; } Date now = new Date(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 8c3712f838..3abfdd2ac2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.StateEvent; @@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.hadoop.util.ThreadUtil; import java.util.Map.Entry; @@ -42,8 +44,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * 1. timeout check wheel - * 2. dependent task check wheel + * Check thread + * 1. timeout task check + * 2. dependent task state check + * 3. retry task check + * 4. timeout process check */ @Component public class StateWheelExecuteThread extends Thread { @@ -56,14 +61,19 @@ public class StateWheelExecuteThread extends Thread { private ConcurrentLinkedQueue processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); /** - * task time out check list, key is taskInstanceId, value is processInstanceId + * task time out check list */ - private ConcurrentHashMap taskInstanceTimeoutCheckList = new ConcurrentHashMap<>(); + private ConcurrentLinkedQueue taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); /** - * task retry check list, key is taskInstanceId, value is processInstanceId + * task retry check list */ - private ConcurrentHashMap taskInstanceRetryCheckList = new ConcurrentHashMap<>(); + private ConcurrentLinkedQueue taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>(); + + /** + * task state check list + */ + private ConcurrentLinkedQueue taskInstanceStateCheckList = new ConcurrentLinkedQueue<>(); @Autowired private MasterConfig masterConfig; @@ -80,6 +90,7 @@ public class StateWheelExecuteThread extends Thread { try { checkTask4Timeout(); checkTask4Retry(); + checkTask4State(); checkProcess4Timeout(); } catch (Exception e) { logger.error("state wheel thread check error:", e); @@ -96,8 +107,39 @@ public class StateWheelExecuteThread extends Thread { processInstanceTimeoutCheckList.remove(processInstance.getId()); } - public void addTask4TimeoutCheck(TaskInstance taskInstance) { - if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) { + private void checkProcess4Timeout() { + if (processInstanceTimeoutCheckList.isEmpty()) { + return; + } + for (Integer processInstanceId : processInstanceTimeoutCheckList) { + if (processInstanceId == null) { + continue; + } + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId); + processInstanceTimeoutCheckList.remove(processInstanceId); + continue; + } + ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + if (processInstance == null) { + continue; + } + long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); + if (timeRemain < 0) { + addProcessTimeoutEvent(processInstance); + processInstanceTimeoutCheckList.remove(processInstance.getId()); + } + } + } + + public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; + } + if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) { return; } TaskDefinition taskDefinition = taskInstance.getTaskDefine(); @@ -106,19 +148,29 @@ public class StateWheelExecuteThread extends Thread { return; } if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { - taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + taskInstanceTimeoutCheckList.add(taskInstanceKey); } if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + taskInstanceTimeoutCheckList.add(taskInstanceKey); } } - public void removeTask4TimeoutCheck(TaskInstance taskInstance) { - taskInstanceTimeoutCheckList.remove(taskInstance.getId()); + public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; + } + taskInstanceTimeoutCheckList.remove(taskInstanceKey); } - public void addTask4RetryCheck(TaskInstance taskInstance) { - if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) { + public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; + } + if (taskInstanceRetryCheckList.contains(taskInstanceKey)) { return; } TaskDefinition taskDefinition = taskInstance.getTaskDefine(); @@ -126,43 +178,69 @@ public class StateWheelExecuteThread extends Thread { logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); return; } - if (taskInstance.taskCanRetry()) { - taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId()); + taskInstanceRetryCheckList.add(taskInstanceKey); + } + + public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; } + taskInstanceRetryCheckList.remove(taskInstanceKey); + } + public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; + } + if (taskInstanceStateCheckList.contains(taskInstanceKey)) { + return; + } if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + taskInstanceStateCheckList.add(taskInstanceKey); } } - public void removeTask4RetryCheck(TaskInstance taskInstance) { - taskInstanceRetryCheckList.remove(taskInstance.getId()); + public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); + if (taskInstanceKey == null) { + logger.error("taskInstanceKey is null"); + return; + } + taskInstanceStateCheckList.remove(taskInstanceKey); } private void checkTask4Timeout() { if (taskInstanceTimeoutCheckList.isEmpty()) { return; } - for (Entry entry : taskInstanceTimeoutCheckList.entrySet()) { - int processInstanceId = entry.getValue(); - int taskInstanceId = entry.getKey(); + for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) { + int processInstanceId = taskInstanceKey.getProcessInstanceId(); + long taskCode = taskInstanceKey.getTaskCode(); WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}", - processInstanceId, taskInstanceId); - taskInstanceTimeoutCheckList.remove(taskInstanceId); + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceTimeoutCheckList.remove(taskInstanceKey); continue; } - TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); if (taskInstance == null) { + logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceTimeoutCheckList.remove(taskInstanceKey); continue; } if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); if (timeRemain < 0) { addTaskTimeoutEvent(taskInstance); - taskInstanceTimeoutCheckList.remove(taskInstance.getId()); + taskInstanceTimeoutCheckList.remove(taskInstanceKey); } } } @@ -172,54 +250,63 @@ public class StateWheelExecuteThread extends Thread { if (taskInstanceRetryCheckList.isEmpty()) { return; } - for (Entry entry : taskInstanceRetryCheckList.entrySet()) { - int processInstanceId = entry.getValue(); - int taskInstanceId = entry.getKey(); + for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) { + int processInstanceId = taskInstanceKey.getProcessInstanceId(); + long taskCode = taskInstanceKey.getTaskCode(); WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}", - processInstanceId, taskInstanceId); - taskInstanceRetryCheckList.remove(taskInstanceId); + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } - TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); if (taskInstance == null) { + logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } - if (!taskInstance.getState().typeIsFinished() && (taskInstance.isSubProcess() || taskInstance.isDependTask())) { - addTaskStateChangeEvent(taskInstance); - } else if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) { - addTaskStateChangeEvent(taskInstance); - taskInstanceRetryCheckList.remove(taskInstance.getId()); + if (taskInstance.retryTaskIntervalOverTime()) { + // reset taskInstance endTime and state + // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance + taskInstance.setEndTime(null); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + + addTaskRetryEvent(taskInstance); + taskInstanceRetryCheckList.remove(taskInstanceKey); } } } - private void checkProcess4Timeout() { - if (processInstanceTimeoutCheckList.isEmpty()) { + private void checkTask4State() { + if (taskInstanceStateCheckList.isEmpty()) { return; } - for (Integer processInstanceId : processInstanceTimeoutCheckList) { - if (processInstanceId == null) { - continue; - } + for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) { + int processInstanceId = taskInstanceKey.getProcessInstanceId(); + long taskCode = taskInstanceKey.getTaskCode(); + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId); - processInstanceTimeoutCheckList.remove(processInstanceId); + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceStateCheckList.remove(taskInstanceKey); continue; } - ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); - if (processInstance == null) { + TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); + if (taskInstance == null) { + logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", + processInstanceId, taskCode); + taskInstanceStateCheckList.remove(taskInstanceKey); continue; } - long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (timeRemain < 0) { - addProcessTimeoutEvent(processInstance); - processInstanceTimeoutCheckList.remove(processInstance.getId()); + if (taskInstance.getState().typeIsFinished()) { + continue; } + addTaskStateChangeEvent(taskInstance); } } @@ -228,6 +315,17 @@ public class StateWheelExecuteThread extends Thread { stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setTaskCode(taskInstance.getTaskCode()); + stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + private void addTaskRetryEvent(TaskInstance taskInstance) { + StateEvent stateEvent = new StateEvent(); + stateEvent.setType(StateEventType.TASK_RETRY); + stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setTaskCode(taskInstance.getTaskCode()); stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); workflowExecuteThreadPool.submitStateEvent(stateEvent); } @@ -237,6 +335,7 @@ public class StateWheelExecuteThread extends Thread { stateEvent.setType(StateEventType.TASK_TIMEOUT); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setTaskCode(taskInstance.getTaskCode()); workflowExecuteThreadPool.submitStateEvent(stateEvent); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 73e655f0ab..2bc16a3f9f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -159,34 +159,38 @@ public class WorkflowExecuteThread { private Map taskInstanceMap = new ConcurrentHashMap<>(); /** - * running TaskNode, taskId as key + * running taskProcessor, taskCode as key, taskProcessor as value + * only on taskProcessor per taskCode */ - private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); + private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); /** * valid task map, taskCode as key, taskId as value + * in a DAG, only one taskInstance per taskCode is valid */ - private Map validTaskMap = new ConcurrentHashMap<>(); + private Map validTaskMap = new ConcurrentHashMap<>(); /** - * error task map, taskCode as key, taskId as value + * error task map, taskCode as key, taskInstanceId as value + * in a DAG, only one taskInstance per taskCode is valid */ - private Map errorTaskMap = new ConcurrentHashMap<>(); + private Map errorTaskMap = new ConcurrentHashMap<>(); /** - * complete task map, taskCode as key, taskId as value + * complete task map, taskCode as key, taskInstanceId as value + * in a DAG, only one taskInstance per taskCode is valid */ - private Map completeTaskMap = new ConcurrentHashMap<>(); + private Map completeTaskMap = new ConcurrentHashMap<>(); /** * depend failed task map, taskCode as key, taskId as value */ - private Map dependFailedTaskMap = new ConcurrentHashMap<>(); + private Map dependFailedTaskMap = new ConcurrentHashMap<>(); /** * forbidden task map, code as key */ - private Map forbiddenTaskMap = new ConcurrentHashMap<>(); + private Map forbiddenTaskMap = new ConcurrentHashMap<>(); /** * skip task map, code as key @@ -208,6 +212,12 @@ public class WorkflowExecuteThread { */ private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + /** + * wait to retry taskInstance map, taskCode as key, taskInstance as value + * before retry, the taskInstance id is 0 + */ + private Map waitToRetryTaskInstanceMap = new ConcurrentHashMap<>(); + /** * state wheel execute thread */ @@ -317,6 +327,9 @@ public class WorkflowExecuteThread { case WAIT_TASK_GROUP: result = checkForceStartAndWakeUp(stateEvent); break; + case TASK_RETRY: + result = taskRetryEventHandler(stateEvent); + break; default: break; } @@ -330,8 +343,8 @@ public class WorkflowExecuteThread { private boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { - ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); taskProcessor.init(taskInstance, processInstance); taskProcessor.action(TaskAction.DISPATCH); @@ -341,8 +354,8 @@ public class WorkflowExecuteThread { if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue); if (acquireTaskGroup) { - ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); taskProcessor.init(taskInstance, processInstance); taskProcessor.action(TaskAction.DISPATCH); @@ -363,7 +376,7 @@ public class WorkflowExecuteThread { } TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) { - ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.TIMEOUT); } else { processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine()); @@ -387,79 +400,57 @@ public class WorkflowExecuteThread { return true; } - if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) { + if (task.getState().typeIsFinished()) { + if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { + return true; + } taskFinished(task); if (task.getTaskGroupId() > 0) { - //release task group - TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task); - if (nextTaskInstance != null) { - if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) { - StateEvent nextEvent = new StateEvent(); - nextEvent.setProcessInstanceId(this.processInstance.getId()); - nextEvent.setTaskInstanceId(nextTaskInstance.getId()); - nextEvent.setType(StateEventType.WAIT_TASK_GROUP); - this.stateEvents.add(nextEvent); - } else { - ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); - } - } + releaseTaskGroup(task); } - } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { - ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); + return true; + } + if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) { + ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode()); iTaskProcessor.action(TaskAction.RUN); - if (iTaskProcessor.taskState().typeIsFinished()) { - task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { taskFinished(task); } - } else { - logger.error("state handler error: {}", stateEvent); + return true; } + logger.error("state handler error: {}", stateEvent); + return true; } - private void taskFinished(TaskInstance task) { - logger.info("work flow {} task {} state:{} ", + private void taskFinished(TaskInstance taskInstance) { + logger.info("work flow {} task id:{} code:{} state:{} ", processInstance.getId(), - task.getId(), - task.getState()); - if (task.taskCanRetry()) { - addTaskToStandByList(task); - if (!task.retryTaskIntervalOverTime()) { - logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}", - processInstance.getId(), - task.getId(), - task.getState(), - task.getRetryTimes(), - task.getMaxRetryTimes(), - task.getRetryInterval()); - stateWheelExecuteThread.addTask4TimeoutCheck(task); - stateWheelExecuteThread.addTask4RetryCheck(task); - } else { - submitStandByTask(); - stateWheelExecuteThread.removeTask4TimeoutCheck(task); - stateWheelExecuteThread.removeTask4RetryCheck(task); - } - return; - } + taskInstance.getId(), + taskInstance.getTaskCode(), + taskInstance.getState()); - completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); - activeTaskProcessorMaps.remove(task.getId()); - stateWheelExecuteThread.removeTask4TimeoutCheck(task); - stateWheelExecuteThread.removeTask4RetryCheck(task); + activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); + stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); + stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); + stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance); - if (task.getState().typeIsSuccess()) { - processInstance.setVarPool(task.getVarPool()); + if (taskInstance.getState().typeIsSuccess()) { + completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); - submitPostNode(Long.toString(task.getTaskCode())); - } else if (task.getState().typeIsFailure()) { - if (task.isConditionsTask() - || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { - submitPostNode(Long.toString(task.getTaskCode())); + submitPostNode(Long.toString(taskInstance.getTaskCode())); + } else if (taskInstance.taskCanRetry()) { + // retry task + retryTaskInstance(taskInstance); + } else if (taskInstance.getState().typeIsFailure()) { + completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + if (taskInstance.isConditionsTask() + || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { + submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { - errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -468,6 +459,73 @@ public class WorkflowExecuteThread { this.updateProcessInstanceState(); } + /** + * release task group + * @param taskInstance + */ + private void releaseTaskGroup(TaskInstance taskInstance) { + if (taskInstance.getTaskGroupId() > 0) { + TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); + if (nextTaskInstance != null) { + if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { + StateEvent nextEvent = new StateEvent(); + nextEvent.setProcessInstanceId(this.processInstance.getId()); + nextEvent.setTaskInstanceId(nextTaskInstance.getId()); + nextEvent.setType(StateEventType.WAIT_TASK_GROUP); + this.stateEvents.add(nextEvent); + } else { + ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); + this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + } + } + } + } + + /** + * crate new task instance to retry, different objects from the original + * @param taskInstance + */ + private void retryTaskInstance(TaskInstance taskInstance) { + if (!taskInstance.taskCanRetry()) { + return; + } + TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); + if (newTaskInstance == null) { + logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); + return; + } + waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); + if (!taskInstance.retryTaskIntervalOverTime()) { + logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", + processInstance.getId(), + newTaskInstance.getTaskCode(), + newTaskInstance.getState(), + newTaskInstance.getRetryTimes(), + newTaskInstance.getMaxRetryTimes(), + newTaskInstance.getRetryInterval()); + stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); + stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); + } else { + addTaskToStandByList(newTaskInstance); + submitStandByTask(); + waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode()); + } + } + + /** + * handle task retry event + * @param stateEvent + * @return + */ + private boolean taskRetryEventHandler(StateEvent stateEvent) { + TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode()); + addTaskToStandByList(taskInstance); + submitStandByTask(); + waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode()); + return true; + } + /** * update process instance */ @@ -492,9 +550,9 @@ public class WorkflowExecuteThread { processService.packageTaskInstance(taskInstance, processInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance); - validTaskMap.remove(Long.toString(taskInstance.getTaskCode())); + validTaskMap.remove(taskInstance.getTaskCode()); if (Flag.YES == taskInstance.getFlag()) { - validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId()); + validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); } } @@ -561,6 +619,20 @@ public class WorkflowExecuteThread { return null; } + public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) { + if (activeTaskProcessorMaps.containsKey(taskCode)) { + return activeTaskProcessorMaps.get(taskCode).taskInstance(); + } + return null; + } + + public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) { + if (waitToRetryTaskInstanceMap.containsKey(taskCode)) { + return waitToRetryTaskInstanceMap.get(taskCode); + } + return null; + } + private boolean processStateChangeHandler(StateEvent stateEvent) { try { logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus()); @@ -736,7 +808,7 @@ public class WorkflowExecuteThread { taskNodeList.forEach(taskNode -> { if (taskNode.isForbidden()) { - forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode); + forbiddenTaskMap.put(taskNode.getCode(), taskNode); } }); @@ -767,17 +839,39 @@ public class WorkflowExecuteThread { if (!isNewProcessInstance()) { List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { - validTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + if (validTaskMap.containsKey(task.getTaskCode())) { + int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); + TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); + if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) { + task.setFlag(Flag.NO); + processService.updateTaskInstance(task); + continue; + } + logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode()); + } + + validTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); if (task.isTaskComplete()) { - completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + completeTaskMap.put(task.getTaskCode(), task.getId()); + continue; } if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { continue; } - if (task.getState().typeIsFailure() && !task.taskCanRetry()) { - errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + if (task.taskCanRetry()) { + if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { + // tolerantTaskInstance add to standby list directly + TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); + addTaskToStandByList(tolerantTaskInstance); + } else { + retryTaskInstance(task); + } + continue; + } + if (task.getState().typeIsFailure()) { + errorTaskMap.put(task.getTaskCode(), task.getId()); } } } @@ -832,19 +926,32 @@ public class WorkflowExecuteThread { taskInstance.getId(), taskInstance.getName()); return null; } - validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId()); + + // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid + if (validTaskMap.containsKey(taskInstance.getTaskCode())) { + int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode()); + if (taskInstance.getId() != oldTaskInstanceId) { + TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); + oldTaskInstance.setFlag(Flag.NO); + processService.updateTaskInstance(oldTaskInstance); + validTaskMap.remove(taskInstance.getTaskCode()); + activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); + } + } + + validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); - activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); + activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor); taskProcessor.action(TaskAction.RUN); - stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance); - stateWheelExecuteThread.addTask4RetryCheck(taskInstance); + stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); + stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance); - if (taskProcessor.taskState().typeIsFinished()) { + if (taskProcessor.taskInstance().getState().typeIsFinished()) { StateEvent stateEvent = new StateEvent(); stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskState()); + stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); this.stateEvents.add(stateEvent); } @@ -898,78 +1005,129 @@ public class WorkflowExecuteThread { */ private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion()); - if (taskInstance == null) { - taskInstance = new TaskInstance(); - taskInstance.setTaskCode(taskNode.getCode()); - taskInstance.setTaskDefinitionVersion(taskNode.getVersion()); - // task name - taskInstance.setName(taskNode.getName()); - // task instance state - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - // process instance id - taskInstance.setProcessInstanceId(processInstance.getId()); - // task instance type - taskInstance.setTaskType(taskNode.getType().toUpperCase()); - // task instance whether alert - taskInstance.setAlertFlag(Flag.NO); - - // task instance start time - taskInstance.setStartTime(null); - - // task instance flag - taskInstance.setFlag(Flag.YES); - - // task dry run flag - taskInstance.setDryRun(processInstance.getDryRun()); - - // task instance retry times - taskInstance.setRetryTimes(0); - - // max task instance retry times - taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes()); - - // retry task instance interval - taskInstance.setRetryInterval(taskNode.getRetryInterval()); - - //set task param - taskInstance.setTaskParams(taskNode.getTaskParams()); - - //set task group and priority - taskInstance.setTaskGroupId(taskNode.getTaskGroupId()); - taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority()); - - // task instance priority - if (taskNode.getTaskInstancePriority() == null) { - taskInstance.setTaskInstancePriority(Priority.MEDIUM); - } else { - taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); - } + if (taskInstance != null) { + return taskInstance; + } + + return newTaskInstance(processInstance, taskNode); + } + + /** + * clone a new taskInstance for retry and reset some logic fields + * @return + */ + public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { + TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); + if (taskNode == null) { + logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); + return null; + } + TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); + newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); + newTaskInstance.setProcessInstance(processInstance); + newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); + // todo relative funtion: TaskInstance.retryTaskIntervalOverTime + newTaskInstance.setState(taskInstance.getState()); + newTaskInstance.setEndTime(taskInstance.getEndTime()); + return newTaskInstance; + } - String processWorkerGroup = processInstance.getWorkerGroup(); - processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup; - String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup(); + /** + * clone a new taskInstance for tolerant and reset some logic fields + * @return + */ + public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { + TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); + if (taskNode == null) { + logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); + return null; + } + TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); + newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); + newTaskInstance.setProcessInstance(processInstance); + newTaskInstance.setRetryTimes(taskInstance.getRetryTimes()); + newTaskInstance.setState(taskInstance.getState()); + return newTaskInstance; + } - Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode(); - Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode(); + /** + * new a taskInstance + * @param processInstance + * @param taskNode + * @return + */ + public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskCode(taskNode.getCode()); + taskInstance.setTaskDefinitionVersion(taskNode.getVersion()); + // task name + taskInstance.setName(taskNode.getName()); + // task instance state + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + // process instance id + taskInstance.setProcessInstanceId(processInstance.getId()); + // task instance type + taskInstance.setTaskType(taskNode.getType().toUpperCase()); + // task instance whether alert + taskInstance.setAlertFlag(Flag.NO); + + // task instance start time + taskInstance.setStartTime(null); + + // task instance flag + taskInstance.setFlag(Flag.YES); + + // task dry run flag + taskInstance.setDryRun(processInstance.getDryRun()); + + // task instance retry times + taskInstance.setRetryTimes(0); + + // max task instance retry times + taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes()); + + // retry task instance interval + taskInstance.setRetryInterval(taskNode.getRetryInterval()); + + //set task param + taskInstance.setTaskParams(taskNode.getTaskParams()); + + //set task group and priority + taskInstance.setTaskGroupId(taskNode.getTaskGroupId()); + taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority()); + + // task instance priority + if (taskNode.getTaskInstancePriority() == null) { + taskInstance.setTaskInstancePriority(Priority.MEDIUM); + } else { + taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); + } - if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) { - taskInstance.setWorkerGroup(processWorkerGroup); - taskInstance.setEnvironmentCode(processEnvironmentCode); - } else { - taskInstance.setWorkerGroup(taskWorkerGroup); - taskInstance.setEnvironmentCode(taskEnvironmentCode); - } + String processWorkerGroup = processInstance.getWorkerGroup(); + processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup; + String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup(); - if (!taskInstance.getEnvironmentCode().equals(-1L)) { - Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode()); - if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) { - taskInstance.setEnvironmentConfig(environment.getConfig()); - } - } - // delay execution time - taskInstance.setDelayTime(taskNode.getDelayTime()); + Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode(); + Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode(); + + if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) { + taskInstance.setWorkerGroup(processWorkerGroup); + taskInstance.setEnvironmentCode(processEnvironmentCode); + } else { + taskInstance.setWorkerGroup(taskWorkerGroup); + taskInstance.setEnvironmentCode(taskEnvironmentCode); } + if (!taskInstance.getEnvironmentCode().equals(-1L)) { + Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode()); + if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) { + taskInstance.setEnvironmentConfig(environment.getConfig()); + } + } + // delay execution time + taskInstance.setDelayTime(taskNode.getDelayTime()); return taskInstance; } @@ -978,7 +1136,7 @@ public class WorkflowExecuteThread { Map allTaskInstance = new HashMap<>(); if (CollectionUtils.isNotEmpty(preTask)) { for (String preTaskCode : preTask) { - Integer taskId = completeTaskMap.get(preTaskCode); + Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode)); if (taskId == null) { continue; } @@ -1073,7 +1231,7 @@ public class WorkflowExecuteThread { continue; } - if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) { + if (task.getId() > 0 && completeTaskMap.containsKey(task.getTaskCode())) { logger.info("task {} has already run success", task.getName()); continue; } @@ -1106,10 +1264,11 @@ public class WorkflowExecuteThread { for (String depsNode : indirectDepCodeList) { if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) { // dependencies must be fully completed - if (!completeTaskMap.containsKey(depsNode)) { + Long despNodeTaskCode = Long.parseLong(depsNode); + if (!completeTaskMap.containsKey(despNodeTaskCode)) { return DependResult.WAITING; } - Integer depsTaskId = completeTaskMap.get(depsNode); + Integer depsTaskId = completeTaskMap.get(despNodeTaskCode); ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState(); if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) { return DependResult.NON_EXEC; @@ -1138,7 +1297,7 @@ public class WorkflowExecuteThread { TaskNode taskNode = dag.getNode(taskCode); List depCodeList = taskNode.getDepList(); for (String depsNode : depCodeList) { - if (forbiddenTaskMap.containsKey(depsNode)) { + if (forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) { setIndirectDepList(depsNode, indirectDepCodeList); } else { indirectDepCodeList.add(depsNode); @@ -1157,7 +1316,8 @@ public class WorkflowExecuteThread { return false; } } else { - Integer taskInstanceId = completeTaskMap.get(dependNodeName); + long taskCode = Long.parseLong(dependNodeName); + Integer taskInstanceId = completeTaskMap.get(taskCode); ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); if (depTaskState.typeIsFailure()) { return false; @@ -1228,7 +1388,9 @@ public class WorkflowExecuteThread { return true; } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { - return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0; + return readyToSubmitTaskQueue.size() == 0 + && activeTaskProcessorMaps.size() == 0 + && waitToRetryTaskInstanceMap.size() == 0; } } return false; @@ -1310,7 +1472,7 @@ public class WorkflowExecuteThread { // success if (state == ExecutionStatus.RUNNING_EXECUTION) { List killTasks = getCompleteTaskByState(ExecutionStatus.KILL); - if (readyToSubmitTaskQueue.size() > 0) { + if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { //tasks currently pending submission, no retries, indicating that depend is waiting to complete return ExecutionStatus.RUNNING_EXECUTION; } else if (CollectionUtils.isNotEmpty(killTasks)) { @@ -1445,19 +1607,23 @@ public class WorkflowExecuteThread { private void killAllTasks() { logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), activeTaskProcessorMaps.size()); - for (int taskId : activeTaskProcessorMaps.keySet()) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskId); + for (long taskCode : activeTaskProcessorMaps.keySet()) { + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode); + Integer taskInstanceId = validTaskMap.get(taskCode); + if (taskInstanceId == null || taskInstanceId.equals(0)) { + continue; + } + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); if (taskInstance == null || taskInstance.getState().typeIsFinished()) { continue; } - ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId); taskProcessor.action(TaskAction.STOP); - if (taskProcessor.taskState().typeIsFinished()) { + if (taskProcessor.taskInstance().getState().typeIsFinished()) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskState()); + stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState()); this.addStateEvent(stateEvent); } } @@ -1485,7 +1651,7 @@ public class WorkflowExecuteThread { task.setState(retryTask.getState()); logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); removeTaskFromStandbyList(task); - completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + completeTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); submitPostNode(Long.toString(task.getTaskCode())); continue; @@ -1499,21 +1665,15 @@ public class WorkflowExecuteThread { } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { - if (task.retryTaskIntervalOverTime()) { - int originalId = task.getId(); - TaskInstance taskInstance = submitTaskExec(task); - if (taskInstance == null) { - this.taskFailedSubmit = true; - } else { - removeTaskFromStandbyList(task); - if (taskInstance.getId() != originalId) { - activeTaskProcessorMaps.remove(originalId); - } - } + TaskInstance taskInstance = submitTaskExec(task); + if (taskInstance == null) { + this.taskFailedSubmit = true; + } else { + removeTaskFromStandbyList(task); } } else if (DependResult.FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + dependFailedTaskMap.put(task.getTaskCode(), task.getId()); removeTaskFromStandbyList(task); logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); } else if (DependResult.NON_EXEC == dependResult) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 45870558f4..f9fb9c9314 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -62,6 +62,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Autowired private StateEventCallbackService stateEventCallbackService; + @Autowired + private StateWheelExecuteThread stateWheelExecuteThread; + /** * multi-thread filter, avoid handling workflow at the same time */ @@ -119,6 +122,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onSuccess(Object result) { if (workflowExecuteThread.workFlowFinish()) { + stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(workflowExecuteThread.getProcessInstance()); logger.info("process instance {} finished.", processInstanceId); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 6e67d487d6..deb0166955 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -204,8 +204,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return null; } - public ExecutionStatus taskState() { - return this.taskInstance.getState(); + public TaskInstance taskInstance() { + return this.taskInstance; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 9646285a82..bb639a3ada 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -71,11 +71,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { return true; } - @Override - public ExecutionStatus taskState() { - return this.taskInstance.getState(); - } - @Override public boolean runTask() { if (conditionResult.equals(DependResult.WAITING)) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 0f89a44124..a2bbae7393 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -89,11 +89,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor { return true; } - @Override - public ExecutionStatus taskState() { - return this.taskInstance.getState(); - } - @Override public boolean runTask() { if (!allDependentItemFinished) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index d1f3c4c727..d7d241b7f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -32,6 +31,6 @@ public interface ITaskProcessor { String getType(); - ExecutionStatus taskState(); + TaskInstance taskInstance(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 5a6355696c..19217af2fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -64,11 +64,6 @@ public class SubTaskProcessor extends BaseTaskProcessor { return true; } - @Override - public ExecutionStatus taskState() { - return this.taskInstance.getState(); - } - @Override public boolean runTask() { try { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 912822b173..b194eac71c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -77,7 +77,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { @Override public boolean runTask() { try { - if (!this.taskState().typeIsFinished() && setSwitchResult()) { + if (!this.taskInstance().getState().typeIsFinished() && setSwitchResult()) { endTaskState(); } } catch (Exception e) { @@ -120,11 +120,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { return TaskType.SWITCH.getDesc(); } - @Override - public ExecutionStatus taskState() { - return this.taskInstance.getState(); - } - private boolean setSwitchResult() { List taskInstances = processService.findValidTaskListByProcessId( taskInstance.getProcessInstanceId() diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java new file mode 100644 index 0000000000..a5d7707576 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Objects; + +/** + * task instance key, processInstanceId + */ +public class TaskInstanceKey { + private int processInstanceId; + private long taskCode; + private int taskVersion; + + public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) { + this.processInstanceId = processInstanceId; + this.taskCode = taskCode; + this.taskVersion = taskVersion; + } + + public int getProcessInstanceId() { + return processInstanceId; + } + + public long getTaskCode() { + return taskCode; + } + + public int getTaskVersion() { + return taskVersion; + } + + public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) { + if (processInstance == null || taskInstance == null) { + return null; + } + return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + } + + @Override + public String toString() { + return "TaskKey{" + + "processInstanceId=" + processInstanceId + + ", taskCode=" + taskCode + + ", taskVersion=" + taskVersion + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o; + return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion; + } + + @Override + public int hashCode() { + return Objects.hash(processInstanceId, taskCode, taskVersion); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 3d18498f8b..5907a81368 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -156,7 +156,7 @@ public class WorkflowExecuteThreadTest { Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4"); Mockito.when(processService.findTaskInstanceByIdList( - Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) + Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); Class masterExecThreadClass = WorkflowExecuteThread.class; Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class); @@ -198,9 +198,9 @@ public class WorkflowExecuteThreadTest { taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance2.getId(), taskInstance2); - Map completeTaskList = new ConcurrentHashMap<>(); - completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId()); - completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId()); + Map completeTaskList = new ConcurrentHashMap<>(); + completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId()); + completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); Class masterExecThreadClass = WorkflowExecuteThread.class; @@ -216,7 +216,7 @@ public class WorkflowExecuteThreadTest { Assert.assertNotNull(taskInstance.getVarPool()); taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); - completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId()); + completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); completeTaskMapField.setAccessible(true); completeTaskMapField.set(workflowExecuteThread, completeTaskList); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 5cebcf7ad7..6d49860513 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -114,6 +114,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.SerializationUtils; import java.util.ArrayList; import java.util.Arrays; @@ -1378,30 +1379,11 @@ public class ProcessService { */ public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { ExecutionStatus processInstanceState = processInstance.getState(); - - if (taskInstance.getState().typeIsFailure()) { - if (taskInstance.isSubProcess()) { - taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); - } else { - if (processInstanceState != ExecutionStatus.READY_STOP - && processInstanceState != ExecutionStatus.READY_PAUSE) { - // failure task set invalid - taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); - // crate new task instance - if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE) { - taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); - } - taskInstance.setSubmitTime(null); - taskInstance.setLogPath(null); - taskInstance.setExecutePath(null); - taskInstance.setStartTime(null); - taskInstance.setEndTime(null); - taskInstance.setFlag(Flag.YES); - taskInstance.setHost(null); - taskInstance.setId(0); - } - } + if (processInstanceState.typeIsFinished() + || processInstanceState == ExecutionStatus.READY_PAUSE + || processInstanceState == ExecutionStatus.READY_STOP) { + logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState); + return null; } taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());