Browse Source

fix timeout (#7222)

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
c2d0acd713
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  2. 101
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  3. 105
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  4. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -108,6 +108,11 @@ public class MasterSchedulerService extends Thread {
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task retry check list
*/
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
/**
* key:code-version
* value: processDefinition
@ -127,6 +132,7 @@ public class MasterSchedulerService extends Thread {
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
this.processInstanceExecMaps,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
}
@ -176,8 +182,6 @@ public class MasterSchedulerService extends Thread {
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
*
* @throws Exception
*/
private void scheduleProcess() throws Exception {
@ -201,7 +205,8 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList);
, taskTimeoutCheckList
, taskRetryCheckList);
this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {

101
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -42,18 +42,21 @@ public class StateWheelExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList;
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList;
private ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances,
ConcurrentHashMap<Integer, TaskInstance> taskInstances,
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
this.processInstanceCheckList = processInstances;
this.taskInstanceCheckList = taskInstances;
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
this.processInstanceExecMaps = processInstanceExecMaps;
this.stateCheckIntervalSecs = stateCheckIntervalSecs;
}
@ -64,8 +67,9 @@ public class StateWheelExecuteThread extends Thread {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
checkProcess();
checkTask();
checkTask4Timeout();
checkTask4Retry();
checkProcess4Timeout();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
@ -73,85 +77,96 @@ public class StateWheelExecuteThread extends Thread {
}
}
public boolean addProcess(ProcessInstance processInstance) {
this.processInstanceCheckList.put(processInstance.getId(), processInstance);
return true;
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance);
}
public boolean addTask(TaskInstance taskInstance) {
this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance);
return true;
public void addTask4TimeoutCheck(TaskInstance taskInstance) {
this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
public void addTask4RetryCheck(TaskInstance taskInstance) {
this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance);
}
private void checkTask() {
if (taskInstanceCheckList.isEmpty()) {
public void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) {
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain && processTimeout(taskInstance)) {
taskInstanceCheckList.remove(taskInstance.getId());
if (0 >= timeRemain) {
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
}
}
}
}
private void checkTask4Retry() {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) {
if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
processDependCheck(taskInstance);
taskInstanceCheckList.remove(taskInstance.getId());
addTaskStateChangeEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstance.getId());
}
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
processDependCheck(taskInstance);
addTaskStateChangeEvent(taskInstance);
}
}
}
private void checkProcess() {
if (processInstanceCheckList.isEmpty()) {
private void checkProcess4Timeout() {
if (processInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (ProcessInstance processInstance : this.processInstanceCheckList.values()) {
for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 <= timeRemain && processTimeout(processInstance)) {
processInstanceCheckList.remove(processInstance.getId());
if (0 >= timeRemain) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
}
}
private void putEvent(StateEvent stateEvent) {
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
}
private boolean processDependCheck(TaskInstance taskInstance) {
private boolean addTaskStateChangeEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
putEvent(stateEvent);
addEvent(stateEvent);
return true;
}
private boolean processTimeout(TaskInstance taskInstance) {
private boolean addTaskTimeoutEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
putEvent(stateEvent);
addEvent(stateEvent);
return true;
}
private boolean processTimeout(ProcessInstance processInstance) {
private boolean addProcessTimeoutEvent(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
stateEvent.setProcessInstanceId(processInstance.getId());
putEvent(stateEvent);
addEvent(stateEvent);
return true;
}
private void addEvent(StateEvent stateEvent) {
if (!processInstanceExecMaps.contains(stateEvent.getProcessInstanceId())) {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
}
}

105
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -17,13 +17,25 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -33,7 +45,13 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -46,14 +64,29 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
/**
* master exec thread,split dag
@ -153,9 +186,13 @@ public class WorkflowExecuteThread implements Runnable {
private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
/**
* task retry check list
*/
private ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList;
/**
* start flag, true: start nodes submit completely
*
*/
private boolean isStart = false;
@ -165,14 +202,14 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
* @param taskTimeoutCheckList
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList) {
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList) {
this.processService = processService;
this.processInstance = processInstance;
@ -180,6 +217,7 @@ public class WorkflowExecuteThread implements Runnable {
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
this.taskRetryCheckList = taskRetryCheckList;
}
@Override
@ -197,7 +235,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* the process start nodes are submitted completely.
* @return
*/
public boolean isStart() {
return this.isStart;
@ -296,11 +333,10 @@ public class WorkflowExecuteThread implements Runnable {
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
taskProcessor.action(TaskAction.TIMEOUT);
return false;
} else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
return true;
}
return true;
}
private boolean processTimeout() {
@ -342,6 +378,7 @@ public class WorkflowExecuteThread implements Runnable {
task.getMaxRetryTimes(),
task.getRetryInterval());
this.addTimeoutCheck(task);
this.addRetryCheck(task);
} else {
submitStandByTask();
}
@ -351,6 +388,7 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskList.put(Long.toString(task.getTaskCode()), task);
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
taskRetryCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
@ -497,7 +535,7 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getProcessDefinitionVersion());
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList =
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskList.clear();
taskNodeList.forEach(taskNode -> {
@ -585,6 +623,7 @@ public class WorkflowExecuteThread implements Runnable {
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
@ -635,15 +674,34 @@ public class WorkflowExecuteThread implements Runnable {
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
} else {
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
}
}
private void addRetryCheck(TaskInstance taskInstance) {
if (taskRetryCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (taskInstance.taskCanRetry()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
}
/**
* find task instance in db.
* in case submit more than one same name task in the same time.
@ -991,7 +1049,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* generate the latest process instance status by the tasks state
*
* @param instance
* @return process instance execution status
*/
private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -34,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
@ -47,7 +44,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -105,7 +101,8 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList));
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);

Loading…
Cancel
Save