Browse Source

[Bug-7865][MasterServer] retry logic optimization (#8156)

* submit task optimization

* cloneAndReset

* update

* tolerant task restart when init

* fix snoar check

* fix test

* delete unuse file

* taskInstance key

* code style

* skip complete when retry

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
b5fa54b6be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
  2. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  3. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  4. 205
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  5. 508
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  6. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  7. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  8. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  9. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  10. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
  11. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  12. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  13. 83
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
  14. 10
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  15. 30
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java

@ -35,6 +35,8 @@ public class StateEvent {
private int taskInstanceId;
private long taskCode;
private int processInstanceId;
private String context;
@ -53,6 +55,10 @@ public class StateEvent {
return taskInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getProcessInstanceId() {
return processInstanceId;
}
@ -73,6 +79,10 @@ public class StateEvent {
this.taskInstanceId = taskInstanceId;
}
public void setTaskCode(long taskCode) {
this.taskCode = taskCode;
}
public Channel getChannel() {
return channel;
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -26,6 +26,7 @@ public enum StateEventType {
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry")
;
StateEventType(int code, String descp) {

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.lang3.SerializationUtils;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
@ -599,7 +601,8 @@ public class TaskInstance implements Serializable {
}
/**
* determine if you can try again
* determine if a task instance can retry
* if subProcess,
*
* @return can try result
*/
@ -609,10 +612,8 @@ public class TaskInstance implements Serializable {
}
if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
return true;
} else {
return (this.getState().typeIsFailure()
&& this.getRetryTimes() < this.getMaxRetryTimes());
}
return this.getState() == ExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
}
/**
@ -624,9 +625,7 @@ public class TaskInstance implements Serializable {
if (getState() != ExecutionStatus.FAILURE) {
return true;
}
if (getId() == 0
|| getMaxRetryTimes() == 0
|| getRetryInterval() == 0) {
if (getMaxRetryTimes() == 0 || getRetryInterval() == 0) {
return true;
}
Date now = new Date();

205
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import java.util.Map.Entry;
@ -42,8 +44,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 1. timeout check wheel
* 2. dependent task check wheel
* Check thread
* 1. timeout task check
* 2. dependent task state check
* 3. retry task check
* 4. timeout process check
*/
@Component
public class StateWheelExecuteThread extends Thread {
@ -56,14 +61,19 @@ public class StateWheelExecuteThread extends Thread {
private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task time out check list, key is taskInstanceId, value is processInstanceId
* task time out check list
*/
private ConcurrentHashMap<Integer, Integer> taskInstanceTimeoutCheckList = new ConcurrentHashMap<>();
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task retry check list, key is taskInstanceId, value is processInstanceId
* task retry check list
*/
private ConcurrentHashMap<Integer, Integer> taskInstanceRetryCheckList = new ConcurrentHashMap<>();
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
/**
* task state check list
*/
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
@Autowired
private MasterConfig masterConfig;
@ -80,6 +90,7 @@ public class StateWheelExecuteThread extends Thread {
try {
checkTask4Timeout();
checkTask4Retry();
checkTask4State();
checkProcess4Timeout();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
@ -96,8 +107,39 @@ public class StateWheelExecuteThread extends Thread {
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
public void addTask4TimeoutCheck(TaskInstance taskInstance) {
if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
private void checkProcess4Timeout() {
if (processInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
if (processInstanceId == null) {
continue;
}
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
processInstanceTimeoutCheckList.remove(processInstanceId);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
}
}
public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@ -106,19 +148,29 @@ public class StateWheelExecuteThread extends Thread {
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
taskInstanceTimeoutCheckList.add(taskInstanceKey);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
taskInstanceTimeoutCheckList.add(taskInstanceKey);
}
}
public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
public void addTask4RetryCheck(TaskInstance taskInstance) {
if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
@ -126,43 +178,69 @@ public class StateWheelExecuteThread extends Thread {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (taskInstance.taskCanRetry()) {
taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
taskInstanceRetryCheckList.add(taskInstanceKey);
}
public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceRetryCheckList.remove(taskInstanceKey);
}
public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
return;
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
taskInstanceStateCheckList.add(taskInstanceKey);
}
}
public void removeTask4RetryCheck(TaskInstance taskInstance) {
taskInstanceRetryCheckList.remove(taskInstance.getId());
public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceStateCheckList.remove(taskInstanceKey);
}
private void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (Entry<Integer, Integer> entry : taskInstanceTimeoutCheckList.entrySet()) {
int processInstanceId = entry.getValue();
int taskInstanceId = entry.getKey();
for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
processInstanceId, taskInstanceId);
taskInstanceTimeoutCheckList.remove(taskInstanceId);
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
}
}
@ -172,54 +250,63 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (Entry<Integer, Integer> entry : taskInstanceRetryCheckList.entrySet()) {
int processInstanceId = entry.getValue();
int taskInstanceId = entry.getKey();
for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
processInstanceId, taskInstanceId);
taskInstanceRetryCheckList.remove(taskInstanceId);
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
if (!taskInstance.getState().typeIsFinished() && (taskInstance.isSubProcess() || taskInstance.isDependTask())) {
addTaskStateChangeEvent(taskInstance);
} else if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
addTaskStateChangeEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstance.getId());
if (taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
addTaskRetryEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
}
}
}
private void checkProcess4Timeout() {
if (processInstanceTimeoutCheckList.isEmpty()) {
private void checkTask4State() {
if (taskInstanceStateCheckList.isEmpty()) {
return;
}
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
if (processInstanceId == null) {
continue;
}
for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
processInstanceTimeoutCheckList.remove(processInstanceId);
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
if (taskInstance.getState().typeIsFinished()) {
continue;
}
addTaskStateChangeEvent(taskInstance);
}
}
@ -228,6 +315,17 @@ public class StateWheelExecuteThread extends Thread {
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setTaskCode(taskInstance.getTaskCode());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void addTaskRetryEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_RETRY);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setTaskCode(taskInstance.getTaskCode());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
@ -237,6 +335,7 @@ public class StateWheelExecuteThread extends Thread {
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setTaskCode(taskInstance.getTaskCode());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}

508
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -159,34 +159,38 @@ public class WorkflowExecuteThread {
private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
* running TaskNode, taskId as key
* running taskProcessor, taskCode as key, taskProcessor as value
* only on taskProcessor per taskCode
*/
private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
private final Map<Long, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
* valid task map, taskCode as key, taskId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
* error task map, taskCode as key, taskId as value
* error task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
* complete task map, taskCode as key, taskId as value
* complete task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
* depend failed task map, taskCode as key, taskId as value
*/
private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
/**
* forbidden task map, code as key
*/
private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
/**
* skip task map, code as key
@ -208,6 +212,12 @@ public class WorkflowExecuteThread {
*/
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* wait to retry taskInstance map, taskCode as key, taskInstance as value
* before retry, the taskInstance id is 0
*/
private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
/**
* state wheel execute thread
*/
@ -317,6 +327,9 @@ public class WorkflowExecuteThread {
case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent);
break;
case TASK_RETRY:
result = taskRetryEventHandler(stateEvent);
break;
default:
break;
}
@ -330,8 +343,8 @@ public class WorkflowExecuteThread {
private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
@ -341,8 +354,8 @@ public class WorkflowExecuteThread {
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
if (acquireTaskGroup) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
@ -363,7 +376,7 @@ public class WorkflowExecuteThread {
}
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
} else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
@ -387,79 +400,57 @@ public class WorkflowExecuteThread {
return true;
}
if (task.getState().typeIsFinished() && !completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
return true;
}
taskFinished(task);
if (task.getTaskGroupId() > 0) {
//release task group
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(task);
if (nextTaskInstance != null) {
if (nextTaskInstance.getProcessInstanceId() == task.getProcessInstanceId()) {
StateEvent nextEvent = new StateEvent();
nextEvent.setProcessInstanceId(this.processInstance.getId());
nextEvent.setTaskInstanceId(nextTaskInstance.getId());
nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
releaseTaskGroup(task);
}
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
return true;
}
if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskState().typeIsFinished()) {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
taskFinished(task);
}
} else {
logger.error("state handler error: {}", stateEvent);
return true;
}
logger.error("state handler error: {}", stateEvent);
return true;
}
private void taskFinished(TaskInstance task) {
logger.info("work flow {} task {} state:{} ",
private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(),
task.getId(),
task.getState());
if (task.taskCanRetry()) {
addTaskToStandByList(task);
if (!task.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
task.getId(),
task.getState(),
task.getRetryTimes(),
task.getMaxRetryTimes(),
task.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(task);
stateWheelExecuteThread.addTask4RetryCheck(task);
} else {
submitStandByTask();
stateWheelExecuteThread.removeTask4TimeoutCheck(task);
stateWheelExecuteThread.removeTask4RetryCheck(task);
}
return;
}
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
activeTaskProcessorMaps.remove(task.getId());
stateWheelExecuteThread.removeTask4TimeoutCheck(task);
stateWheelExecuteThread.removeTask4RetryCheck(task);
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(Long.toString(task.getTaskCode()));
} else if (task.getState().typeIsFailure()) {
if (task.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
submitPostNode(Long.toString(task.getTaskCode()));
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else if (taskInstance.taskCanRetry()) {
// retry task
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (taskInstance.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@ -468,6 +459,73 @@ public class WorkflowExecuteThread {
this.updateProcessInstanceState();
}
/**
* release task group
* @param taskInstance
*/
private void releaseTaskGroup(TaskInstance taskInstance) {
if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) {
if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) {
StateEvent nextEvent = new StateEvent();
nextEvent.setProcessInstanceId(this.processInstance.getId());
nextEvent.setTaskInstanceId(nextTaskInstance.getId());
nextEvent.setType(StateEventType.WAIT_TASK_GROUP);
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
}
/**
* crate new task instance to retry, different objects from the original
* @param taskInstance
*/
private void retryTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.taskCanRetry()) {
return;
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
newTaskInstance.getRetryTimes(),
newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
} else {
addTaskToStandByList(newTaskInstance);
submitStandByTask();
waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
}
}
/**
* handle task retry event
* @param stateEvent
* @return
*/
private boolean taskRetryEventHandler(StateEvent stateEvent) {
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
addTaskToStandByList(taskInstance);
submitStandByTask();
waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
return true;
}
/**
* update process instance
*/
@ -492,9 +550,9 @@ public class WorkflowExecuteThread {
processService.packageTaskInstance(taskInstance, processInstance);
taskInstanceMap.put(taskInstance.getId(), taskInstance);
validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
validTaskMap.remove(taskInstance.getTaskCode());
if (Flag.YES == taskInstance.getFlag()) {
validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
}
@ -561,6 +619,20 @@ public class WorkflowExecuteThread {
return null;
}
public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
if (activeTaskProcessorMaps.containsKey(taskCode)) {
return activeTaskProcessorMaps.get(taskCode).taskInstance();
}
return null;
}
public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
return waitToRetryTaskInstanceMap.get(taskCode);
}
return null;
}
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
@ -736,7 +808,7 @@ public class WorkflowExecuteThread {
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
forbiddenTaskMap.put(taskNode.getCode(), taskNode);
}
});
@ -767,17 +839,39 @@ public class WorkflowExecuteThread {
if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
continue;
}
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode());
}
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
if (task.taskCanRetry()) {
if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
// tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
retryTaskInstance(task);
}
continue;
}
if (task.getState().typeIsFailure()) {
errorTaskMap.put(task.getTaskCode(), task.getId());
}
}
}
@ -832,19 +926,32 @@ public class WorkflowExecuteThread {
taskInstance.getId(), taskInstance.getName());
return null;
}
validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
if (taskInstance.getId() != oldTaskInstanceId) {
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
oldTaskInstance.setFlag(Flag.NO);
processService.updateTaskInstance(oldTaskInstance);
validTaskMap.remove(taskInstance.getTaskCode());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
}
}
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
if (taskProcessor.taskState().typeIsFinished()) {
if (taskProcessor.taskInstance().getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskState());
stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(stateEvent);
}
@ -898,78 +1005,129 @@ public class WorkflowExecuteThread {
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
if (taskInstance == null) {
taskInstance = new TaskInstance();
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(taskNode.getName());
// task instance state
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
// task instance type
taskInstance.setTaskType(taskNode.getType().toUpperCase());
// task instance whether alert
taskInstance.setAlertFlag(Flag.NO);
// task instance start time
taskInstance.setStartTime(null);
// task instance flag
taskInstance.setFlag(Flag.YES);
// task dry run flag
taskInstance.setDryRun(processInstance.getDryRun());
// task instance retry times
taskInstance.setRetryTimes(0);
// max task instance retry times
taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
// retry task instance interval
taskInstance.setRetryInterval(taskNode.getRetryInterval());
//set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
//set task group and priority
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
} else {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
if (taskInstance != null) {
return taskInstance;
}
return newTaskInstance(processInstance, taskNode);
}
/**
* clone a new taskInstance for retry and reset some logic fields
* @return
*/
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
newTaskInstance.setProcessInstance(processInstance);
newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime
newTaskInstance.setState(taskInstance.getState());
newTaskInstance.setEndTime(taskInstance.getEndTime());
return newTaskInstance;
}
String processWorkerGroup = processInstance.getWorkerGroup();
processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
/**
* clone a new taskInstance for tolerant and reset some logic fields
* @return
*/
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
newTaskInstance.setProcessInstance(processInstance);
newTaskInstance.setRetryTimes(taskInstance.getRetryTimes());
newTaskInstance.setState(taskInstance.getState());
return newTaskInstance;
}
Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
/**
* new a taskInstance
* @param processInstance
* @param taskNode
* @return
*/
public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(taskNode.getName());
// task instance state
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
// task instance type
taskInstance.setTaskType(taskNode.getType().toUpperCase());
// task instance whether alert
taskInstance.setAlertFlag(Flag.NO);
// task instance start time
taskInstance.setStartTime(null);
// task instance flag
taskInstance.setFlag(Flag.YES);
// task dry run flag
taskInstance.setDryRun(processInstance.getDryRun());
// task instance retry times
taskInstance.setRetryTimes(0);
// max task instance retry times
taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());
// retry task instance interval
taskInstance.setRetryInterval(taskNode.getRetryInterval());
//set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
//set task group and priority
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
} else {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);
taskInstance.setEnvironmentCode(processEnvironmentCode);
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
taskInstance.setEnvironmentCode(taskEnvironmentCode);
}
String processWorkerGroup = processInstance.getWorkerGroup();
processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());
}
}
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
Long processEnvironmentCode = Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
Long taskEnvironmentCode = Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);
taskInstance.setEnvironmentCode(processEnvironmentCode);
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
taskInstance.setEnvironmentCode(taskEnvironmentCode);
}
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());
}
}
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
return taskInstance;
}
@ -978,7 +1136,7 @@ public class WorkflowExecuteThread {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
Integer taskId = completeTaskMap.get(preTaskCode);
Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode));
if (taskId == null) {
continue;
}
@ -1073,7 +1231,7 @@ public class WorkflowExecuteThread {
continue;
}
if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
if (task.getId() > 0 && completeTaskMap.containsKey(task.getTaskCode())) {
logger.info("task {} has already run success", task.getName());
continue;
}
@ -1106,10 +1264,11 @@ public class WorkflowExecuteThread {
for (String depsNode : indirectDepCodeList) {
if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
// dependencies must be fully completed
if (!completeTaskMap.containsKey(depsNode)) {
Long despNodeTaskCode = Long.parseLong(depsNode);
if (!completeTaskMap.containsKey(despNodeTaskCode)) {
return DependResult.WAITING;
}
Integer depsTaskId = completeTaskMap.get(depsNode);
Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
@ -1138,7 +1297,7 @@ public class WorkflowExecuteThread {
TaskNode taskNode = dag.getNode(taskCode);
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (forbiddenTaskMap.containsKey(depsNode)) {
if (forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) {
setIndirectDepList(depsNode, indirectDepCodeList);
} else {
indirectDepCodeList.add(depsNode);
@ -1157,7 +1316,8 @@ public class WorkflowExecuteThread {
return false;
}
} else {
Integer taskInstanceId = completeTaskMap.get(dependNodeName);
long taskCode = Long.parseLong(dependNodeName);
Integer taskInstanceId = completeTaskMap.get(taskCode);
ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.typeIsFailure()) {
return false;
@ -1228,7 +1388,9 @@ public class WorkflowExecuteThread {
return true;
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0;
return readyToSubmitTaskQueue.size() == 0
&& activeTaskProcessorMaps.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
}
}
return false;
@ -1310,7 +1472,7 @@ public class WorkflowExecuteThread {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
if (readyToSubmitTaskQueue.size() > 0) {
if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
//tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@ -1445,19 +1607,23 @@ public class WorkflowExecuteThread {
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
activeTaskProcessorMaps.size());
for (int taskId : activeTaskProcessorMaps.keySet()) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
for (long taskCode : activeTaskProcessorMaps.keySet()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
Integer taskInstanceId = validTaskMap.get(taskCode);
if (taskInstanceId == null || taskInstanceId.equals(0)) {
continue;
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
if (taskInstance == null || taskInstance.getState().typeIsFinished()) {
continue;
}
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
taskProcessor.action(TaskAction.STOP);
if (taskProcessor.taskState().typeIsFinished()) {
if (taskProcessor.taskInstance().getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskState());
stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
this.addStateEvent(stateEvent);
}
}
@ -1485,7 +1651,7 @@ public class WorkflowExecuteThread {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
@ -1499,21 +1665,15 @@ public class WorkflowExecuteThread {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (task.retryTaskIntervalOverTime()) {
int originalId = task.getId();
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
this.taskFailedSubmit = true;
} else {
removeTaskFromStandbyList(task);
if (taskInstance.getId() != originalId) {
activeTaskProcessorMaps.remove(originalId);
}
}
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
this.taskFailedSubmit = true;
} else {
removeTaskFromStandbyList(task);
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -62,6 +62,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Autowired
private StateEventCallbackService stateEventCallbackService;
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
/**
* multi-thread filter, avoid handling workflow at the same time
*/
@ -119,6 +122,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("process instance {} finished.", processInstanceId);

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -204,8 +204,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
public ExecutionStatus taskState() {
return this.taskInstance.getState();
public TaskInstance taskInstance() {
return this.taskInstance;
}
/**

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -71,11 +71,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -89,11 +89,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public boolean runTask() {
if (!allDependentItemFinished) {

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -32,6 +31,6 @@ public interface ITaskProcessor {
String getType();
ExecutionStatus taskState();
TaskInstance taskInstance();
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -64,11 +64,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public boolean runTask() {
try {

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -77,7 +77,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
@Override
public boolean runTask() {
try {
if (!this.taskState().typeIsFinished() && setSwitchResult()) {
if (!this.taskInstance().getState().typeIsFinished() && setSwitchResult()) {
endTaskState();
}
} catch (Exception e) {
@ -120,11 +120,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return TaskType.SWITCH.getDesc();
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
private boolean setSwitchResult() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()

83
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Objects;
/**
* task instance key, processInstanceId
*/
public class TaskInstanceKey {
private int processInstanceId;
private long taskCode;
private int taskVersion;
public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
this.processInstanceId = processInstanceId;
this.taskCode = taskCode;
this.taskVersion = taskVersion;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getTaskVersion() {
return taskVersion;
}
public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
if (processInstance == null || taskInstance == null) {
return null;
}
return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
}
@Override
public String toString() {
return "TaskKey{" +
"processInstanceId=" + processInstanceId +
", taskCode=" + taskCode +
", taskVersion=" + taskVersion +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
}
@Override
public int hashCode() {
return Objects.hash(processInstanceId, taskCode, taskVersion);
}
}

10
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -156,7 +156,7 @@ public class WorkflowExecuteThreadTest {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
Mockito.when(processService.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
@ -198,9 +198,9 @@ public class WorkflowExecuteThreadTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
@ -216,7 +216,7 @@ public class WorkflowExecuteThreadTest {
Assert.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);

30
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -114,6 +114,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.SerializationUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -1378,30 +1379,11 @@ public class ProcessService {
*/
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (taskInstance.getState().typeIsFailure()) {
if (taskInstance.isSubProcess()) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
&& processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
// crate new task instance
if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
}
taskInstance.setSubmitTime(null);
taskInstance.setLogPath(null);
taskInstance.setExecutePath(null);
taskInstance.setStartTime(null);
taskInstance.setEndTime(null);
taskInstance.setFlag(Flag.YES);
taskInstance.setHost(null);
taskInstance.setId(0);
}
}
if (processInstanceState.typeIsFinished()
|| processInstanceState == ExecutionStatus.READY_PAUSE
|| processInstanceState == ExecutionStatus.READY_STOP) {
logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
return null;
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());

Loading…
Cancel
Save