Browse Source

[Fix-10842] Fix master/worker failover will cause status incorrect (#10839)

* Fix master failover will not update task instance status
* Add some failover log
* Fix worker failover will rerun task more than once
* Fix workflowInstance failover may rerun already success taskInstance
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
3f69ec8f28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java
  2. 32
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 45
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  4. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  5. 153
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  6. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
  7. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  8. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  9. 125
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  10. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  11. 371
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  12. 253
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  13. 266
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  14. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
  15. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  16. 22
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  17. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  18. 42
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  19. 41
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
  20. 5
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java

@ -498,5 +498,16 @@ public class DAG<Node, NodeInfo, EdgeInfo> {
return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList);
}
@Override
public String toString() {
return "DAG{"
+ "nodesMap="
+ nodesMap
+ ", edgesMap="
+ edgesMap
+ ", reverseEdgesMap="
+ reverseEdgesMap
+ '}';
}
}

32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -34,11 +34,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections.CollectionUtils;
@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -187,8 +188,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
return true;
}
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
if (!taskInstanceOptional.isPresent()) {
logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
taskPriority);
// we return true, so that we will drop this task.
return true;
}
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup());
ExecutionContext executionContext =
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@ -196,16 +213,21 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
return true;
}
}
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error: ", e);
logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
}
return result;
}

45
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -19,13 +19,17 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import lombok.Data;
/**
* execution context
*/
@Data
public class ExecutionContext {
/**
@ -34,51 +38,30 @@ public class ExecutionContext {
private Host host;
/**
* command
* command
*/
private final Command command;
private final TaskInstance taskInstance;
/**
* executor type : worker or client
* executor type : worker or client
*/
private final ExecutorType executorType;
/**
* worker group
* worker group
*/
private String workerGroup;
private final String workerGroup;
public ExecutionContext(Command command, ExecutorType executorType) {
this(command, executorType, DEFAULT_WORKER_GROUP);
public ExecutionContext(Command command, ExecutorType executorType, TaskInstance taskInstance) {
this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);
}
public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
public ExecutionContext(Command command, ExecutorType executorType, String workerGroup, TaskInstance taskInstance) {
this.command = command;
this.executorType = executorType;
this.workerGroup = workerGroup;
}
public Command getCommand() {
return command;
}
public ExecutorType getExecutorType() {
return executorType;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public String getWorkerGroup() {
return this.workerGroup;
}
public Host getHost() {
return host;
}
public void setHost(Host host) {
this.host = host;
this.taskInstance = taskInstance;
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -117,6 +117,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
doExecute(host, command);
success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {

153
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -27,6 +26,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
@ -76,7 +76,7 @@ public class TaskExecuteRunnable implements Runnable {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
logger.error("persist error, event:{}, error: {}", event, e);
logger.error("persist task event error, event:{}", event, e);
} finally {
this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@ -114,37 +114,44 @@ public class TaskExecuteRunnable implements Runnable {
*
* @param taskEvent taskEvent
*/
private void persist(TaskEvent taskEvent) {
private void persist(TaskEvent taskEvent) throws Exception {
Event event = taskEvent.getEvent();
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
Optional<TaskInstance> taskInstance;
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
boolean needToSendEvent = true;
switch (event) {
case DISPATCH:
handleDispatchEvent(taskEvent, taskInstance);
needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
// dispatch event do not need to submit state event
return;
break;
case DELAY:
case RUNNING:
handleRunningEvent(taskEvent, taskInstance);
needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
handleResultEvent(taskEvent, taskInstance);
needToSendEvent = handleResultEvent(taskEvent, taskInstance);
break;
case WORKER_REJECT:
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable);
needToSendEvent =
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
if (!needToSendEvent) {
logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent);
return;
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
@ -157,101 +164,101 @@ public class TaskExecuteRunnable implements Runnable {
/**
* handle dispatch event
*/
private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
private boolean handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
return;
return false;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
return;
return false;
}
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
processService.saveTaskInstance(taskInstance);
return true;
}
/**
* handle running event
*/
private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
private boolean handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
processService.saveTaskInstance(taskInstance);
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
// send ack to worker
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
logger.error("handle worker ack master error", e);
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
}
/**
* handle result event
*/
private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}",
taskInstance.getId(),
taskInstance.getState());
return false;
} else {
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.saveTaskInstance(taskInstance);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
} catch (Exception e) {
logger.error("handle worker response master error", e);
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
// send ack to worker
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
return true;
}
/**
* handle result event
*/
private void handleWorkerRejectEvent(Channel channel, Optional<TaskInstance> taskInstanceOptional, WorkflowExecuteRunnable executeThread) {
TaskInstance taskInstance = taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null"));
try {
if (executeThread != null) {
executeThread.resubmit(taskInstance.getTaskCode());
}
if (channel != null) {
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
private boolean handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent);
return false;
}
} catch (Exception e) {
logger.error("handle worker reject error", e);
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId());
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.saveTaskInstance(taskInstance);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
return true;
}
/**
* handle result event
*/
private boolean handleWorkerRejectEvent(Channel channel,
Optional<TaskInstance> taskInstanceOptional,
WorkflowExecuteRunnable executeThread) throws Exception {
TaskInstance taskInstance =
taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null"));
if (executeThread != null) {
executeThread.resubmit(taskInstance.getTaskCode());
}
if (channel != null) {
TaskRecallAckCommand taskRecallAckCommand =
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
}
return true;
}
}

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java

@ -64,6 +64,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
break;
case REMOVE:
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
break;
default:
break;

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,7 +42,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
* failover service
*/
@Autowired
private FailoverService failoverService;
private MasterFailoverService masterFailoverService;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
@ -63,7 +64,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
failoverService.checkMasterFailover();
masterFailoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import java.util.Optional;
@ -296,15 +296,21 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
}
if (!taskInstanceOptional.isPresent()) {
logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
logger.warn(
"Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.retryTaskIntervalOverTime()) {
// We check the status to avoid when we do worker failover we submit a failover task, this task may be resubmit by this
// thread
if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE
&& taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
taskInstance.getId());
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);

125
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -108,6 +108,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
@ -141,7 +142,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* process instance
*/
private ProcessInstance processInstance;
private final ProcessInstance processInstance;
/**
* process definition
@ -298,6 +299,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
logger.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
@ -483,8 +485,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId);
// just update the processInstance field(this is soft copy)
BeanUtils.copyProperties(newProcessInstance, processInstance);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@ -770,6 +776,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
logger.info("Build dag success, dag: {}", dag);
}
/**
@ -784,45 +791,60 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.clear();
if (!isNewProcessInstance()) {
logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
processInstance.getRunTimes(),
processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
continue;
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
logger.info(
"Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) {
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
continue;
}
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
task.getTaskCode());
}
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
task.getTaskCode());
}
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.taskCanRetry()) {
if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
// tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
retryTaskInstance(task);
if (task.isTaskComplete()) {
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
continue;
}
if (task.getState().typeIsFailure()) {
errorTaskMap.put(task.getTaskCode(), task.getId());
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
dag)) {
continue;
}
if (task.taskCanRetry()) {
if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
// tolerantTaskInstance add to standby list directly
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
retryTaskInstance(task);
}
continue;
}
if (task.getState().typeIsFailure()) {
errorTaskMap.put(task.getTaskCode(), task.getId());
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
} else {
logger.info("The current workflowInstance is a newly running workflowInstance");
}
if (processInstance.isComplementData() && complementListDate.isEmpty()) {
@ -849,15 +871,22 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
String globalParams =
curingParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
}
}
}
}
logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
dependFailedTaskMap,
completeTaskMap,
errorTaskMap);
}
/**
@ -899,6 +928,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
processInstance.getId(),
processInstance.getName(),
taskInstance.getId(),
taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
@ -1816,14 +1854,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* is new process instance
*/
private boolean isNewProcessInstance() {
if (Flag.YES.equals(processInstance.getRecovery())) {
logger.info("This workInstance will be recover by this execution");
return false;
}
if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) {
return true;
} else if (processInstance.getRecovery().equals(Flag.YES)) {
// host is empty use old task instance
return false;
} else {
return false;
}
logger.info(
"The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}",
processInstance.getState(),
processInstance.getRunTimes());
return false;
}
public void resubmit(long taskCode) throws Exception {

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -67,7 +67,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
}
dispatchTask();
return true;
}
@ -119,7 +118,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
logger.info("task ready to dispatch to worker: taskInstanceId: {}", taskInstance.getId());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@ -167,7 +166,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);

371
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -17,46 +17,12 @@
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
/**
@ -65,41 +31,14 @@ import lombok.NonNull;
@Component
public class FailoverService {
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
private final RegistryClient registryClient;
private final MasterConfig masterConfig;
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
private final String localAddress;
public FailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
/**
* check master failover
*/
@Counted(value = "ds.master.scheduler.failover.check.count")
@Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
public void checkMasterFailover() {
List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
return;
}
LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
private final MasterFailoverService masterFailoverService;
private final WorkerFailoverService workerFailoverService;
for (String host : hosts) {
failoverMasterWithLock(host);
}
public FailoverService(@NonNull MasterFailoverService masterFailoverService,
@NonNull WorkerFailoverService workerFailoverService) {
this.masterFailoverService = masterFailoverService;
this.workerFailoverService = workerFailoverService;
}
/**
@ -111,304 +50,18 @@ public class FailoverService {
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
switch (nodeType) {
case MASTER:
failoverMasterWithLock(serverHost);
LOGGER.info("Master failover starting, masterServer: {}", serverHost);
masterFailoverService.failoverMaster(serverHost);
LOGGER.info("Master failover finished, masterServer: {}", serverHost);
break;
case WORKER:
failoverWorker(serverHost);
LOGGER.info("Worker failover staring, workerServer: {}", serverHost);
workerFailoverService.failoverWorker(serverHost);
LOGGER.info("Worker failover finished, workerServer: {}", serverHost);
break;
default:
break;
}
}
private void failoverMasterWithLock(String masterHost) {
String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
try {
registryClient.getLock(failoverPath);
this.failoverMaster(masterHost);
} catch (Exception e) {
LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* Failover master, will failover process instance and associated task instance.
* <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
* then the process instance will be failovered.
*
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
if (StringUtils.isEmpty(masterHost)) {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
StopWatch failoverTimeCost = StopWatch.createStarted();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
// servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
List<Server> servers = registryClient.getServerList(NodeType.WORKER);
servers.addAll(registryClient.getServerList(NodeType.MASTER));
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, servers);
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
LOGGER.info("failover process instance id: {}", processInstance.getId());
ProcessInstanceMetrics.incProcessInstanceFailover();
//updateProcessInstance host is null and insert into command
processInstance.setHost(Constants.NULL);
processService.processNeedFailoverProcessInstances(processInstance);
}
failoverTimeCost.stop();
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
/**
* Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
* and failover these tasks.
* <p>
* Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
*
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {
if (StringUtils.isEmpty(workerHost)) {
return;
}
long startTime = System.currentTimeMillis();
// we query the task instance from cache, so that we can directly update the cache
final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
.stream()
.flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
.filter(taskInstance ->
workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
.collect(Collectors.toList());
final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
// only failover the task owned myself if worker down.
if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
continue;
}
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
* @param processInstance
* @param taskInstance
* @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
*/
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
LOGGER.info("The taskInstance doesn't need to failover");
return;
}
TaskMetrics.incTaskFailover();
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
* Get need failover master servers.
* <p>
* Query the process instances from database, if the processInstance's host doesn't exist in registry
* or the host is the currentServer, then it will need to failover.
*
* @return need failover master servers
*/
private List<String> getNeedFailoverMasterServers() {
// failover myself && failover dead masters
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
Iterator<String> iterator = hosts.iterator();
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!localAddress.equals(host)) {
iterator.remove();
}
}
}
return hosts;
}
/**
* task needs failover if task start before server starts
*
* @param servers servers, can container master servers or worker servers
* @param taskInstance task instance
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(List<Server> servers, TaskInstance taskInstance) {
boolean taskNeedFailover = true;
if (taskInstance == null) {
LOGGER.error("Master failover task instance error, taskInstance is null");
return false;
}
if (Constants.NULL.equals(taskInstance.getHost())) {
return false;
}
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
return false;
}
//now no host will execute this task instance,so no need to failover the task
if (taskInstance.getHost() == null) {
return false;
}
//if task start after server starts, there is no need to failover the task.
if (checkTaskAfterServerStart(servers, taskInstance)) {
taskNeedFailover = false;
}
return taskNeedFailover;
}
/**
* check task start after the worker server starts.
*
* @param servers servers, can contain master servers or worker servers
* @param taskInstance task instance
* @return true if task instance start time after server start date
*/
private boolean checkTaskAfterServerStart(List<Server> servers, TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return false;
}
Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost());
if (serverStartDate != null) {
if (taskInstance.getStartTime() == null) {
return taskInstance.getSubmitTime().after(serverStartDate);
} else {
return taskInstance.getStartTime().after(serverStartDate);
}
}
return false;
}
/**
* get failover lock path
*
* @param nodeType zookeeper node type
* @return fail over lock path
*/
private String getFailoverLockPath(NodeType nodeType, String host) {
switch (nodeType) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
return "";
}
}
/**
* get server startup time
*/
private Date getServerStartupTime(NodeType nodeType, String host) {
if (StringUtils.isEmpty(host)) {
return null;
}
List<Server> servers = registryClient.getServerList(nodeType);
return getServerStartupTime(servers, host);
}
/**
* get server startup time
*/
private Date getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return null;
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
return serverStartupTime;
}
public String getLocalAddress() {
return localAddress;
}
}

253
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
@Service
public class MasterFailoverService {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class);
private final RegistryClient registryClient;
private final MasterConfig masterConfig;
private final ProcessService processService;
private final String localAddress;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
/**
* check master failover
*/
@Counted(value = "ds.master.scheduler.failover.check.count")
@Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
public void checkMasterFailover() {
List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
.stream()
// failover myself || dead server
.filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
return;
}
LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts);
for (String needFailoverMasterHost : needFailoverMasterHosts) {
failoverMaster(needFailoverMasterHost);
}
}
public void failoverMaster(String masterHost) {
String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
try {
registryClient.getLock(failoverPath);
doFailoverMaster(masterHost);
} catch (Exception e) {
LOGGER.error("Master server failover failed, host:{}", masterHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* Failover master, will failover process instance and associated task instance.
* <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
* then the process instance will be failovered.
*
* @param masterHost master host
*/
private void doFailoverMaster(@NonNull String masterHost) {
LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
StopWatch failoverTimeCost = StopWatch.createStarted();
Optional<Date> masterStartupTimeOptional =
getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
LOGGER.info(
"Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
masterHost,
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
LOGGER.info("WorkflowInstance failover starting");
if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
for (TaskInstance taskInstance : taskInstanceList) {
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());
LOGGER.info("TaskInstance failover starting");
if (!checkTaskInstanceNeedFailover(taskInstance)) {
LOGGER.info("The taskInstance doesn't need to failover");
continue;
}
failoverTaskInstance(processInstance, taskInstance);
LOGGER.info("TaskInstance failover finished");
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}
ProcessInstanceMetrics.incProcessInstanceFailover();
//updateProcessInstance host is null to mark this processInstance has been failover
// and insert a failover command
processInstance.setHost(Constants.NULL);
processService.processNeedFailoverProcessInstances(processInstance);
LOGGER.info("WorkflowInstance failover finished");
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
failoverTimeCost.stop();
LOGGER.info("Master[{}] failover finished, useTime:{}ms",
masterHost,
failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return Optional.empty();
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
return Optional.ofNullable(serverStartupTime);
}
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
* @param processInstance
* @param taskInstance
*/
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskMetrics.incTaskFailover();
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
processService.saveTaskInstance(taskInstance);
}
private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
// The task is already finished, so we don't need to failover this task instance
return false;
}
return true;
}
private boolean checkProcessInstanceNeedFailover(Optional<Date> beFailoveredMasterStartupTimeOptional,
@NonNull ProcessInstance processInstance) {
// The process has already been failover, since when we do master failover we will hold a lock, so we can guarantee
// the host will not be set concurrent.
if (Constants.NULL.equals(processInstance.getHost())) {
return false;
}
if (!beFailoveredMasterStartupTimeOptional.isPresent()) {
// the master is not active, we can failover all it's processInstance
return true;
}
Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get();
if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) {
// The processInstance is newly created
return false;
}
return true;
}
}

266
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import lombok.NonNull;
@Service
public class WorkerFailoverService {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerFailoverService.class);
private final RegistryClient registryClient;
private final MasterConfig masterConfig;
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
private final String localAddress;
public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
/**
* Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
* and failover these tasks.
* <p>
* Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
*
* @param workerHost worker host
*/
public void failoverWorker(@NonNull String workerHost) {
LOGGER.info("Worker[{}] failover starting", workerHost);
final StopWatch failoverTimeCost = StopWatch.createStarted();
// we query the task instance from cache, so that we can directly update the cache
final Optional<Date> needFailoverWorkerStartTime =
getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
return;
}
LOGGER.info(
"Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
workerHost,
needFailoverTaskInstanceList.size(),
needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
ProcessInstance processInstance =
processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
WorkflowExecuteRunnable workflowExecuteRunnable =
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
return null;
}
return workflowExecuteRunnable.getProcessInstance();
});
if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
continue;
}
LOGGER.info(
"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
workerHost);
failoverTaskInstance(processInstance, taskInstance);
LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
} catch (Exception ex) {
LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
failoverTimeCost.stop();
LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
workerHost,
failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
* @param processInstance
* @param taskInstance
*/
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskMetrics.incTaskFailover();
boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setFlag(Flag.NO);
processService.saveTaskInstance(taskInstance);
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
* task needs failover if task start before server starts
*
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(Optional<Date> needFailoverWorkerStartTime,
@Nullable ProcessInstance processInstance,
TaskInstance taskInstance) {
if (processInstance == null) {
// This case should be happened.
LOGGER.error(
"Failover task instance error, cannot find the related processInstance form memory, this case shouldn't happened");
return false;
}
if (taskInstance == null) {
// This case should be happened.
LOGGER.error("Master failover task instance error, taskInstance is null, this case shouldn't happened");
return false;
}
// only failover the task owned myself if worker down.
if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
LOGGER.error(
"Master failover task instance error, the taskInstance's processInstance's host: {} is not the current master: {}",
processInstance.getHost(),
localAddress);
return false;
}
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
// The taskInstance is already finished, doesn't need to failover
LOGGER.info("The task is already finished, doesn't need to failover");
return false;
}
if (!needFailoverWorkerStartTime.isPresent()) {
// The worker is still down
return true;
}
// The worker is active, may already send some new task to it
if (taskInstance.getSubmitTime() != null && taskInstance.getSubmitTime()
.after(needFailoverWorkerStartTime.get())) {
LOGGER.info(
"The taskInstance's submitTime: {} is after the need failover worker's start time: {}, the taskInstance is newly submit, it doesn't need to failover",
taskInstance.getSubmitTime(),
needFailoverWorkerStartTime.get());
return false;
}
return true;
}
private List<TaskInstance> getNeedFailoverTaskInstance(@NonNull String failoverWorkerHost) {
// we query the task instance from cache, so that we can directly update the cache
return cacheManager.getAll()
.stream()
.flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
// If the worker is in dispatching and the host is not set
.filter(taskInstance -> failoverWorkerHost.equals(taskInstance.getHost())
&& ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
.collect(Collectors.toList());
}
private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return Optional.empty();
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
return Optional.ofNullable(serverStartupTime);
}
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java

@ -52,7 +52,7 @@ public class ExecutionContextTestUtils {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(port)));
return executionContext;

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -70,7 +70,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
@ -89,7 +89,7 @@ public class NettyExecutorManagerTest {
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
nettyExecutorManager.execute(executionContext);

22
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -26,13 +26,14 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -98,9 +99,17 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
workflowExecuteThreadPool,
cacheManager);
failoverService = new FailoverService(masterFailoverService, workerFailoverService);
testMasterHost = failoverService.getLocalAddress();
testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port);
@ -118,6 +127,7 @@ public class FailoverServiceTest {
processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost(testMasterHost);
processInstance.setStartTime(new Date());
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
@ -154,16 +164,10 @@ public class FailoverServiceTest {
given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer)));
given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer)));
ReflectionTestUtils.setField(failoverService, "registryClient", registryClient);
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
}
@Test
public void checkMasterFailoverTest() {
failoverService.checkMasterFailover();
}
@Test
public void failoverMasterTest() {
processInstance.setHost(Constants.NULL);

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@ -62,7 +63,8 @@ import org.springframework.transaction.annotation.Transactional;
public interface ProcessService {
@Transactional
ProcessInstance handleCommand(String host, Command command) throws CronParseException;
ProcessInstance handleCommand(String host, Command command)
throws CronParseException, CodeGenerateUtils.CodeGenerateException;
void moveToErrorCommand(Command command, String message);

42
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -128,7 +128,6 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@ -276,6 +275,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private CuringParamsService curingGlobalParamsService;
@Autowired
private ProcessService processService;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@ -285,7 +287,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
@Transactional
public ProcessInstance handleCommand(String host, Command command) throws CronParseException {
public ProcessInstance handleCommand(String host, Command command) throws CronParseException,
CodeGenerateException {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
@ -762,7 +765,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
Command command,
Map<String, String> cmdParam) {
Map<String, String> cmdParam) throws CodeGenerateException {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@ -922,7 +925,8 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host
* @return process instance
*/
protected ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException {
protected ProcessInstance constructProcessInstance(Command command, String host)
throws CronParseException, CodeGenerateException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
@ -1028,6 +1032,7 @@ public class ProcessServiceImpl implements ProcessService {
case RECOVER_TOLERANCE_FAULT_PROCESS:
// recover tolerance fault process
processInstance.setRecovery(Flag.YES);
processInstance.setRunTimes(runTime + 1);
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
@ -1273,11 +1278,15 @@ public class ProcessServiceImpl implements ProcessService {
while (retryTimes <= commitRetryTimes) {
try {
// submit task to db
task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
// Only want to use transaction here
task = processService.submitTask(processInstance, taskInstance);
if (task != null && task.getId() != 0) {
break;
}
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
logger.error(
"task commit to db failed , taskId {} has already retry {} times, please check the database",
taskInstance.getId(),
retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to db failed", e);
@ -1299,13 +1308,17 @@ public class ProcessServiceImpl implements ProcessService {
@Override
@Transactional
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, processInstance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
logger.info("Start save taskInstance to database : {}, processInstance id:{}, state: {}",
taskInstance.getName(),
taskInstance.getProcessInstanceId(),
processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(),
taskInstance.getProcessInstance(),
processInstance.getState());
return null;
}
@ -1313,8 +1326,13 @@ public class ProcessServiceImpl implements ProcessService {
createSubWorkProcess(processInstance, task);
}
logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {} ",
taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
logger.info(
"End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
taskInstance.getId(),
taskInstance.getName(),
task.getState(),
processInstance.getId(),
processInstance.getState());
return task;
}

41
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -211,16 +211,45 @@ public class TaskPriority implements Comparable<TaskPriority> {
}
TaskPriority that = (TaskPriority) o;
return processInstancePriority == that.processInstancePriority
&& processInstanceId == that.processInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& taskGroupPriority == that.taskGroupPriority
&& Objects.equals(groupName, that.groupName);
&& processInstanceId == that.processInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& taskGroupPriority == that.taskGroupPriority
&& Objects.equals(groupName, that.groupName);
}
@Override
public int hashCode() {
return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName);
return Objects.hash(processInstancePriority,
processInstanceId,
taskInstancePriority,
taskId,
taskGroupPriority,
groupName);
}
@Override
public String toString() {
return "TaskPriority{"
+ "processInstancePriority="
+ processInstancePriority
+ ", processInstanceId="
+ processInstanceId
+ ", taskInstancePriority="
+ taskInstancePriority
+ ", taskId="
+ taskId
+ ", taskExecutionContext="
+ taskExecutionContext
+ ", groupName='"
+ groupName
+ '\''
+ ", context="
+ context
+ ", checkpoint="
+ checkpoint
+ ", taskGroupPriority="
+ taskGroupPriority
+ '}';
}
}

5
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -285,7 +286,7 @@ public class ProcessServiceTest {
}
@Test
public void testHandleCommand() throws CronParseException {
public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
//cannot construct process instance, return null;
String host = "127.0.0.1";
@ -462,7 +463,7 @@ public class ProcessServiceTest {
}
@Test(expected = ServiceException.class)
public void testDeleteNotExistCommand() throws CronParseException {
public void testDeleteNotExistCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
String host = "127.0.0.1";
int definitionVersion = 1;
long definitionCode = 123;

Loading…
Cancel
Save