From 3f69ec8f28c7206c153f5de8c7c0d1fa33d311f9 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sat, 9 Jul 2022 11:54:59 +0800 Subject: [PATCH] [Fix-10842] Fix master/worker failover will cause status incorrect (#10839) * Fix master failover will not update task instance status * Add some failover log * Fix worker failover will rerun task more than once * Fix workflowInstance failover may rerun already success taskInstance --- .../dolphinscheduler/common/graph/DAG.java | 11 + .../consumer/TaskPriorityQueueConsumer.java | 32 +- .../dispatch/context/ExecutionContext.java | 45 +-- .../executor/NettyExecutorManager.java | 3 + .../processor/queue/TaskExecuteRunnable.java | 153 ++++---- .../registry/MasterRegistryDataListener.java | 1 + .../master/runner/FailoverExecuteThread.java | 5 +- .../runner/StateWheelExecuteThread.java | 12 +- .../runner/WorkflowExecuteRunnable.java | 125 ++++-- .../runner/task/CommonTaskProcessor.java | 5 +- .../master/service/FailoverService.java | 371 +----------------- .../master/service/MasterFailoverService.java | 253 ++++++++++++ .../master/service/WorkerFailoverService.java | 266 +++++++++++++ .../dispatch/ExecutionContextTestUtils.java | 2 +- .../executor/NettyExecutorManagerTest.java | 4 +- .../master/service/FailoverServiceTest.java | 22 +- .../service/process/ProcessService.java | 4 +- .../service/process/ProcessServiceImpl.java | 42 +- .../service/queue/TaskPriority.java | 41 +- .../service/process/ProcessServiceTest.java | 5 +- 20 files changed, 852 insertions(+), 550 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java index e57b3dd93e..afa780163a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java @@ -498,5 +498,16 @@ public class DAG { return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList); } + @Override + public String toString() { + return "DAG{" + + "nodesMap=" + + nodesMap + + ", edgesMap=" + + edgesMap + + ", reverseEdgesMap=" + + reverseEdgesMap + + '}'; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index f04ae15dd8..82198e942a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -34,11 +34,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.commons.collections.CollectionUtils; @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -187,8 +188,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { TaskMetrics.incTaskDispatch(); boolean result = false; try { + WorkflowExecuteRunnable workflowExecuteRunnable = + processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId()); + if (workflowExecuteRunnable == null) { + logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority); + return true; + } + Optional taskInstanceOptional = + workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId()); + if (!taskInstanceOptional.isPresent()) { + logger.error("Cannot find the task instance from related processInstance, taskPriority: {}", + taskPriority); + // we return true, so that we will drop this task. + return true; + } + TaskInstance taskInstance = taskInstanceOptional.get(); TaskExecutionContext context = taskPriority.getTaskExecutionContext(); - ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup()); + ExecutionContext executionContext = + new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance); if (isTaskNeedToCheck(taskPriority)) { if (taskInstanceIsFinalState(taskPriority.getTaskId())) { @@ -196,16 +213,21 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { return true; } } + result = dispatcher.dispatch(executionContext); if (result) { - logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); + logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}", + taskPriority.getTaskId(), + executionContext.getHost()); addDispatchEvent(context, executionContext); } else { - logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); + logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}", + taskPriority.getTaskId(), + executionContext.getHost()); } } catch (RuntimeException | ExecuteException e) { - logger.error("Master dispatch task to worker error: ", e); + logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e); } return result; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index b3fba87870..880640e91d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -19,13 +19,17 @@ package org.apache.dolphinscheduler.server.master.dispatch.context; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import lombok.Data; + /** * execution context */ +@Data public class ExecutionContext { /** @@ -34,51 +38,30 @@ public class ExecutionContext { private Host host; /** - * command + * command */ private final Command command; + private final TaskInstance taskInstance; + /** - * executor type : worker or client + * executor type : worker or client */ private final ExecutorType executorType; /** - * worker group + * worker group */ - private String workerGroup; + private final String workerGroup; - public ExecutionContext(Command command, ExecutorType executorType) { - this(command, executorType, DEFAULT_WORKER_GROUP); + public ExecutionContext(Command command, ExecutorType executorType, TaskInstance taskInstance) { + this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance); } - public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) { + public ExecutionContext(Command command, ExecutorType executorType, String workerGroup, TaskInstance taskInstance) { this.command = command; this.executorType = executorType; this.workerGroup = workerGroup; - } - - public Command getCommand() { - return command; - } - - public ExecutorType getExecutorType() { - return executorType; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - - public String getWorkerGroup() { - return this.workerGroup; - } - - public Host getHost() { - return host; - } - - public void setHost(Host host) { - this.host = host; + this.taskInstance = taskInstance; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 82f3416569..fe172f9816 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -117,6 +117,9 @@ public class NettyExecutorManager extends AbstractExecutorManager { doExecute(host, command); success = true; context.setHost(host); + // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host + // is not belongs to the down worker ISSUE-10842. + context.getTaskInstance().setHost(host.getAddress()); } catch (ExecuteException ex) { logger.error(String.format("execute command : %s error", command), ex); try { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index f2a6d6873e..9fc96e7564 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.Event; -import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -27,6 +26,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; @@ -76,7 +76,7 @@ public class TaskExecuteRunnable implements Runnable { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); persist(event); } catch (Exception e) { - logger.error("persist error, event:{}, error: {}", event, e); + logger.error("persist task event error, event:{}", event, e); } finally { this.events.remove(event); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); @@ -114,37 +114,44 @@ public class TaskExecuteRunnable implements Runnable { * * @param taskEvent taskEvent */ - private void persist(TaskEvent taskEvent) { + private void persist(TaskEvent taskEvent) throws Exception { Event event = taskEvent.getEvent(); int taskInstanceId = taskEvent.getTaskInstanceId(); int processInstanceId = taskEvent.getProcessInstanceId(); Optional taskInstance; - WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) { taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId); } else { taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId)); } + boolean needToSendEvent = true; switch (event) { case DISPATCH: - handleDispatchEvent(taskEvent, taskInstance); + needToSendEvent = handleDispatchEvent(taskEvent, taskInstance); // dispatch event do not need to submit state event - return; + break; case DELAY: case RUNNING: - handleRunningEvent(taskEvent, taskInstance); + needToSendEvent = handleRunningEvent(taskEvent, taskInstance); break; case RESULT: - handleResultEvent(taskEvent, taskInstance); + needToSendEvent = handleResultEvent(taskEvent, taskInstance); break; case WORKER_REJECT: - handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable); + needToSendEvent = + handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable); break; default: throw new IllegalArgumentException("invalid event type : " + event); } + if (!needToSendEvent) { + logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent); + return; + } StateEvent stateEvent = new StateEvent(); stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); @@ -157,101 +164,101 @@ public class TaskExecuteRunnable implements Runnable { /** * handle dispatch event */ - private void handleDispatchEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { + private boolean handleDispatchEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { if (!taskInstanceOptional.isPresent()) { logger.error("taskInstance is null"); - return; + return false; } TaskInstance taskInstance = taskInstanceOptional.get(); if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { - return; + return false; } taskInstance.setState(ExecutionStatus.DISPATCH); taskInstance.setHost(taskEvent.getWorkerAddress()); processService.saveTaskInstance(taskInstance); + return true; } /** * handle running event */ - private void handleRunningEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { + private boolean handleRunningEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { Channel channel = taskEvent.getChannel(); - try { - if (taskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState().typeIsFinished()) { - logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); - } else { - taskInstance.setState(taskEvent.getState()); - taskInstance.setStartTime(taskEvent.getStartTime()); - taskInstance.setHost(taskEvent.getWorkerAddress()); - taskInstance.setLogPath(taskEvent.getLogPath()); - taskInstance.setExecutePath(taskEvent.getExecutePath()); - taskInstance.setPid(taskEvent.getProcessId()); - taskInstance.setAppLink(taskEvent.getAppIds()); - processService.saveTaskInstance(taskInstance); - } - } - // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success - // send ack to worker - TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); - } catch (Exception e) { - logger.error("handle worker ack master error", e); - TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); - } - } - - /** - * handle result event - */ - private void handleResultEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { - Channel channel = taskEvent.getChannel(); - try { - if (taskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceOptional.get(); - dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); - + if (taskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", + taskInstance.getId(), + taskInstance.getState()); + return false; + } else { + taskInstance.setState(taskEvent.getState()); taskInstance.setStartTime(taskEvent.getStartTime()); taskInstance.setHost(taskEvent.getWorkerAddress()); taskInstance.setLogPath(taskEvent.getLogPath()); taskInstance.setExecutePath(taskEvent.getExecutePath()); taskInstance.setPid(taskEvent.getProcessId()); taskInstance.setAppLink(taskEvent.getAppIds()); - taskInstance.setState(taskEvent.getState()); - taskInstance.setEndTime(taskEvent.getEndTime()); - taskInstance.setVarPool(taskEvent.getVarPool()); - processService.changeOutParam(taskInstance); processService.saveTaskInstance(taskInstance); } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); - } catch (Exception e) { - logger.error("handle worker response master error", e); - TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } + // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success + // send ack to worker + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = + new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); + return true; } /** * handle result event */ - private void handleWorkerRejectEvent(Channel channel, Optional taskInstanceOptional, WorkflowExecuteRunnable executeThread) { - TaskInstance taskInstance = taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null")); - try { - if (executeThread != null) { - executeThread.resubmit(taskInstance.getTaskCode()); - } - if (channel != null) { - TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId()); - channel.writeAndFlush(taskRecallAckCommand.convert2Command()); + private boolean handleResultEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { + Channel channel = taskEvent.getChannel(); + if (taskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent); + return false; } - } catch (Exception e) { - logger.error("handle worker reject error", e); - TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId()); + + dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); + + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + taskInstance.setState(taskEvent.getState()); + taskInstance.setEndTime(taskEvent.getEndTime()); + taskInstance.setVarPool(taskEvent.getVarPool()); + processService.changeOutParam(taskInstance); + processService.saveTaskInstance(taskInstance); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = + new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); + return true; + } + + /** + * handle result event + */ + private boolean handleWorkerRejectEvent(Channel channel, + Optional taskInstanceOptional, + WorkflowExecuteRunnable executeThread) throws Exception { + TaskInstance taskInstance = + taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null")); + if (executeThread != null) { + executeThread.resubmit(taskInstance.getTaskCode()); + } + if (channel != null) { + TaskRecallAckCommand taskRecallAckCommand = + new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId()); channel.writeAndFlush(taskRecallAckCommand.convert2Command()); } + return true; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index 30cea36db7..1eafb4cb54 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -64,6 +64,7 @@ public class MasterRegistryDataListener implements SubscribeListener { break; case REMOVE: masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true); + break; default: break; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 63f4215f27..16656c8760 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.service.FailoverService; +import org.apache.dolphinscheduler.server.master.service.MasterFailoverService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class FailoverExecuteThread extends BaseDaemonThread { * failover service */ @Autowired - private FailoverService failoverService; + private MasterFailoverService masterFailoverService; protected FailoverExecuteThread() { super("FailoverExecuteThread"); @@ -63,7 +64,7 @@ public class FailoverExecuteThread extends BaseDaemonThread { try { // todo: DO we need to schedule a task to do this kind of check // This kind of check may only need to be executed when a master server start - failoverService.checkMasterFailover(); + masterFailoverService.checkMasterFailover(); } catch (Exception e) { logger.error("Master failover thread execute error", e); } finally { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index c70fb1f1d4..843c1b246c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; @@ -31,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import java.util.Optional; @@ -296,15 +296,21 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } if (!taskInstanceOptional.isPresent()) { - logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check"); + logger.warn( + "Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check"); taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.retryTaskIntervalOverTime()) { + // We check the status to avoid when we do worker failover we submit a failover task, this task may be resubmit by this + // thread + if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE + && taskInstance.retryTaskIntervalOverTime()) { // reset taskInstance endTime and state // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance + logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance", + taskInstance.getId()); taskInstance.setEndTime(null); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 525aa6154e..32d1724b15 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -108,6 +108,7 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; import com.google.common.collect.Lists; @@ -141,7 +142,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * process instance */ - private ProcessInstance processInstance; + private final ProcessInstance processInstance; /** * process definition @@ -298,6 +299,7 @@ public class WorkflowExecuteRunnable implements Callable { StateEventHandler stateEventHandler = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); + logger.info("Begin to handle state event, {}", stateEvent); if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } @@ -483,8 +485,12 @@ public class WorkflowExecuteRunnable implements Callable { */ public void refreshProcessInstance(int processInstanceId) { logger.info("process instance update: {}", processInstanceId); - processInstance = processService.findProcessInstanceById(processInstanceId); - processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId); + // just update the processInstance field(this is soft copy) + BeanUtils.copyProperties(newProcessInstance, processInstance); + + processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); } @@ -770,6 +776,7 @@ public class WorkflowExecuteRunnable implements Callable { } // generate process dag dag = DagHelper.buildDagGraph(processDag); + logger.info("Build dag success, dag: {}", dag); } /** @@ -784,45 +791,60 @@ public class WorkflowExecuteRunnable implements Callable { errorTaskMap.clear(); if (!isNewProcessInstance()) { + logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}", + processInstance.getRunTimes(), + processInstance.getRecovery()); List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { - if (validTaskMap.containsKey(task.getTaskCode())) { - int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); - TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); - if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) { - task.setFlag(Flag.NO); - processService.updateTaskInstance(task); - continue; + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId()); + logger.info( + "Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}", + task.getTaskCode(), + task.getState()); + if (validTaskMap.containsKey(task.getTaskCode())) { + int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); + TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); + if (!oldTaskInstance.getState().typeIsFinished() && task.getState().typeIsFinished()) { + task.setFlag(Flag.NO); + processService.updateTaskInstance(task); + continue; + } + logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", + task.getTaskCode()); } - logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", - task.getTaskCode()); - } - validTaskMap.put(task.getTaskCode(), task.getId()); - taskInstanceMap.put(task.getId(), task); + validTaskMap.put(task.getTaskCode(), task.getId()); + taskInstanceMap.put(task.getId(), task); - if (task.isTaskComplete()) { - completeTaskMap.put(task.getTaskCode(), task.getId()); - continue; - } - if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { - continue; - } - if (task.taskCanRetry()) { - if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { - // tolerantTaskInstance add to standby list directly - TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); - addTaskToStandByList(tolerantTaskInstance); - } else { - retryTaskInstance(task); + if (task.isTaskComplete()) { + completeTaskMap.put(task.getTaskCode(), task.getId()); + continue; } - continue; - } - if (task.getState().typeIsFailure()) { - errorTaskMap.put(task.getTaskCode(), task.getId()); + if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), + dag)) { + continue; + } + if (task.taskCanRetry()) { + if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { + // tolerantTaskInstance add to standby list directly + TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); + addTaskToStandByList(tolerantTaskInstance); + } else { + retryTaskInstance(task); + } + continue; + } + if (task.getState().typeIsFailure()) { + errorTaskMap.put(task.getTaskCode(), task.getId()); + } + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } + } else { + logger.info("The current workflowInstance is a newly running workflowInstance"); } if (processInstance.isComplementData() && complementListDate.isEmpty()) { @@ -849,15 +871,22 @@ public class WorkflowExecuteRunnable implements Callable { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); - String globalParams = - curingParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, - processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); + String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(), + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, + processInstance.getScheduleTime(), + cmdParam.get(Constants.SCHEDULE_TIMEZONE)); processInstance.setGlobalParams(globalParams); processService.updateProcessInstance(processInstance); } } } } + logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}", + dependFailedTaskMap, + completeTaskMap, + errorTaskMap); } /** @@ -899,6 +928,15 @@ public class WorkflowExecuteRunnable implements Callable { validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor); + boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); + if (!dispatchSuccess) { + logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!", + processInstance.getId(), + processInstance.getName(), + taskInstance.getId(), + taskInstance.getName()); + return Optional.empty(); + } taskProcessor.action(TaskAction.RUN); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); @@ -1816,14 +1854,19 @@ public class WorkflowExecuteRunnable implements Callable { * is new process instance */ private boolean isNewProcessInstance() { + if (Flag.YES.equals(processInstance.getRecovery())) { + logger.info("This workInstance will be recover by this execution"); + return false; + } + if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) { return true; - } else if (processInstance.getRecovery().equals(Flag.YES)) { - // host is empty use old task instance - return false; - } else { - return false; } + logger.info( + "The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}", + processInstance.getState(), + processInstance.getRunTimes()); + return false; } public void resubmit(long taskCode) throws Exception { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index fcfe7c67bf..06a72d1c88 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -67,7 +67,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return true; } } - dispatchTask(); return true; } @@ -119,7 +118,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); return true; } - logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId()); + logger.info("task ready to dispatch to worker: taskInstanceId: {}", taskInstance.getId()); TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), @@ -167,7 +166,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); killCommand.setTaskInstanceId(taskInstance.getId()); - ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance); Host host = Host.of(taskInstance.getHost()); executionContext.setHost(host); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 2d4bdfe5da..05a54e8529 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -17,46 +17,12 @@ package org.apache.dolphinscheduler.server.master.service; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.server.master.event.StateEvent; -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.registry.RegistryClient; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; - -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; import lombok.NonNull; /** @@ -65,41 +31,14 @@ import lombok.NonNull; @Component public class FailoverService { private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class); - private final RegistryClient registryClient; - private final MasterConfig masterConfig; - private final ProcessService processService; - private final WorkflowExecuteThreadPool workflowExecuteThreadPool; - private final ProcessInstanceExecCacheManager cacheManager; - private final String localAddress; - - public FailoverService(@NonNull RegistryClient registryClient, - @NonNull MasterConfig masterConfig, - @NonNull ProcessService processService, - @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, - @NonNull ProcessInstanceExecCacheManager cacheManager) { - this.registryClient = registryClient; - this.masterConfig = masterConfig; - this.processService = processService; - this.workflowExecuteThreadPool = workflowExecuteThreadPool; - this.cacheManager = cacheManager; - this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); - } - /** - * check master failover - */ - @Counted(value = "ds.master.scheduler.failover.check.count") - @Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) - public void checkMasterFailover() { - List hosts = getNeedFailoverMasterServers(); - if (CollectionUtils.isEmpty(hosts)) { - return; - } - LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts); + private final MasterFailoverService masterFailoverService; + private final WorkerFailoverService workerFailoverService; - for (String host : hosts) { - failoverMasterWithLock(host); - } + public FailoverService(@NonNull MasterFailoverService masterFailoverService, + @NonNull WorkerFailoverService workerFailoverService) { + this.masterFailoverService = masterFailoverService; + this.workerFailoverService = workerFailoverService; } /** @@ -111,304 +50,18 @@ public class FailoverService { public void failoverServerWhenDown(String serverHost, NodeType nodeType) { switch (nodeType) { case MASTER: - failoverMasterWithLock(serverHost); + LOGGER.info("Master failover starting, masterServer: {}", serverHost); + masterFailoverService.failoverMaster(serverHost); + LOGGER.info("Master failover finished, masterServer: {}", serverHost); break; case WORKER: - failoverWorker(serverHost); + LOGGER.info("Worker failover staring, workerServer: {}", serverHost); + workerFailoverService.failoverWorker(serverHost); + LOGGER.info("Worker failover finished, workerServer: {}", serverHost); break; default: break; } } - private void failoverMasterWithLock(String masterHost) { - String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost); - try { - registryClient.getLock(failoverPath); - this.failoverMaster(masterHost); - } catch (Exception e) { - LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e); - } finally { - registryClient.releaseLock(failoverPath); - } - } - - /** - * Failover master, will failover process instance and associated task instance. - *

When the process instance belongs to the given masterHost and the restartTime is before the current server start up time, - * then the process instance will be failovered. - * - * @param masterHost master host - */ - private void failoverMaster(String masterHost) { - if (StringUtils.isEmpty(masterHost)) { - return; - } - Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); - StopWatch failoverTimeCost = StopWatch.createStarted(); - List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size()); - - // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail. - List servers = registryClient.getServerList(NodeType.WORKER); - servers.addAll(registryClient.getServerList(NodeType.MASTER)); - - for (ProcessInstance processInstance : needFailoverProcessInstanceList) { - if (Constants.NULL.equals(processInstance.getHost())) { - continue; - } - - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance taskInstance : validTaskInstanceList) { - LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance, servers); - } - - if (serverStartupTime != null && processInstance.getRestartTime() != null - && processInstance.getRestartTime().after(serverStartupTime)) { - continue; - } - - LOGGER.info("failover process instance id: {}", processInstance.getId()); - ProcessInstanceMetrics.incProcessInstanceFailover(); - //updateProcessInstance host is null and insert into command - processInstance.setHost(Constants.NULL); - processService.processNeedFailoverProcessInstances(processInstance); - } - - failoverTimeCost.stop(); - LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); - } - - /** - * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker, - * and failover these tasks. - *

- * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master. - * - * @param workerHost worker host - */ - private void failoverWorker(String workerHost) { - if (StringUtils.isEmpty(workerHost)) { - return; - } - - long startTime = System.currentTimeMillis(); - // we query the task instance from cache, so that we can directly update the cache - final List needFailoverTaskInstanceList = cacheManager.getAll() - .stream() - .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream()) - .filter(taskInstance -> - workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState())) - .collect(Collectors.toList()); - final Map processInstanceCacheMap = new HashMap<>(); - LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); - final List workerServers = registryClient.getServerList(NodeType.WORKER); - for (TaskInstance taskInstance : needFailoverTaskInstanceList) { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); - try { - ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); - if (processInstance == null) { - processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance(); - if (processInstance == null) { - LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", - taskInstance.getProcessInstanceId(), taskInstance.getId()); - continue; - } - processInstanceCacheMap.put(processInstance.getId(), processInstance); - } - - // only failover the task owned myself if worker down. - if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) { - continue; - } - - LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance, workerServers); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); - } - - /** - * failover task instance - *

- * 1. kill yarn job if run on worker and there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. try to notify local master - * - * @param processInstance - * @param taskInstance - * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers. - */ - private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List servers) { - if (!checkTaskInstanceNeedFailover(servers, taskInstance)) { - LOGGER.info("The taskInstance doesn't need to failover"); - return; - } - TaskMetrics.incTaskFailover(); - boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); - - taskInstance.setProcessInstance(processInstance); - - if (!isMasterTask) { - LOGGER.info("The failover taskInstance is not master task"); - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - - if (masterConfig.isKillYarnJobWhenTaskFailover()) { - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - } - } else { - LOGGER.info("The failover taskInstance is a master task"); - } - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - - StateEvent stateEvent = new StateEvent(); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(processInstance.getId()); - stateEvent.setExecutionStatus(taskInstance.getState()); - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } - - /** - * Get need failover master servers. - *

- * Query the process instances from database, if the processInstance's host doesn't exist in registry - * or the host is the currentServer, then it will need to failover. - * - * @return need failover master servers - */ - private List getNeedFailoverMasterServers() { - // failover myself && failover dead masters - List hosts = processService.queryNeedFailoverProcessInstanceHost(); - - Iterator iterator = hosts.iterator(); - while (iterator.hasNext()) { - String host = iterator.next(); - if (registryClient.checkNodeExists(host, NodeType.MASTER)) { - if (!localAddress.equals(host)) { - iterator.remove(); - } - } - } - return hosts; - } - - /** - * task needs failover if task start before server starts - * - * @param servers servers, can container master servers or worker servers - * @param taskInstance task instance - * @return true if task instance need fail over - */ - private boolean checkTaskInstanceNeedFailover(List servers, TaskInstance taskInstance) { - - boolean taskNeedFailover = true; - - if (taskInstance == null) { - LOGGER.error("Master failover task instance error, taskInstance is null"); - return false; - } - - if (Constants.NULL.equals(taskInstance.getHost())) { - return false; - } - - if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { - return false; - } - - //now no host will execute this task instance,so no need to failover the task - if (taskInstance.getHost() == null) { - return false; - } - - //if task start after server starts, there is no need to failover the task. - if (checkTaskAfterServerStart(servers, taskInstance)) { - taskNeedFailover = false; - } - - return taskNeedFailover; - } - - /** - * check task start after the worker server starts. - * - * @param servers servers, can contain master servers or worker servers - * @param taskInstance task instance - * @return true if task instance start time after server start date - */ - private boolean checkTaskAfterServerStart(List servers, TaskInstance taskInstance) { - if (StringUtils.isEmpty(taskInstance.getHost())) { - return false; - } - Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost()); - if (serverStartDate != null) { - if (taskInstance.getStartTime() == null) { - return taskInstance.getSubmitTime().after(serverStartDate); - } else { - return taskInstance.getStartTime().after(serverStartDate); - } - } - return false; - } - - /** - * get failover lock path - * - * @param nodeType zookeeper node type - * @return fail over lock path - */ - private String getFailoverLockPath(NodeType nodeType, String host) { - switch (nodeType) { - case MASTER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host; - case WORKER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host; - default: - return ""; - } - } - - /** - * get server startup time - */ - private Date getServerStartupTime(NodeType nodeType, String host) { - if (StringUtils.isEmpty(host)) { - return null; - } - List servers = registryClient.getServerList(nodeType); - return getServerStartupTime(servers, host); - } - - /** - * get server startup time - */ - private Date getServerStartupTime(List servers, String host) { - if (CollectionUtils.isEmpty(servers)) { - return null; - } - Date serverStartupTime = null; - for (Server server : servers) { - if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { - serverStartupTime = server.getCreateTime(); - break; - } - } - return serverStartupTime; - } - - public String getLocalAddress() { - return localAddress; - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java new file mode 100644 index 0000000000..c7e5b4ea13 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.time.StopWatch; + +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; +import lombok.NonNull; + +@Service +public class MasterFailoverService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MasterFailoverService.class); + private final RegistryClient registryClient; + private final MasterConfig masterConfig; + private final ProcessService processService; + private final String localAddress; + + public MasterFailoverService(@NonNull RegistryClient registryClient, + @NonNull MasterConfig masterConfig, + @NonNull ProcessService processService) { + this.registryClient = registryClient; + this.masterConfig = masterConfig; + this.processService = processService; + this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); + + } + + /** + * check master failover + */ + @Counted(value = "ds.master.scheduler.failover.check.count") + @Timed(value = "ds.master.scheduler.failover.check.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) + public void checkMasterFailover() { + List needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost() + .stream() + // failover myself || dead server + .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER)) + .distinct() + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(needFailoverMasterHosts)) { + return; + } + LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts); + + for (String needFailoverMasterHost : needFailoverMasterHosts) { + failoverMaster(needFailoverMasterHost); + } + } + + public void failoverMaster(String masterHost) { + String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost; + try { + registryClient.getLock(failoverPath); + doFailoverMaster(masterHost); + } catch (Exception e) { + LOGGER.error("Master server failover failed, host:{}", masterHost, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + + /** + * Failover master, will failover process instance and associated task instance. + *

When the process instance belongs to the given masterHost and the restartTime is before the current server start up time, + * then the process instance will be failovered. + * + * @param masterHost master host + */ + private void doFailoverMaster(@NonNull String masterHost) { + LOGGER.info("Master[{}] failover starting, need to failover process", masterHost); + StopWatch failoverTimeCost = StopWatch.createStarted(); + + Optional masterStartupTimeOptional = + getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost); + List needFailoverProcessInstanceList = + processService.queryNeedFailoverProcessInstances(masterHost); + + LOGGER.info( + "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", + masterHost, + needFailoverProcessInstanceList.size(), + needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())); + + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + LOGGER.info("WorkflowInstance failover starting"); + if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) { + LOGGER.info("WorkflowInstance doesn't need to failover"); + continue; + } + int processInstanceId = processInstance.getId(); + List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); + for (TaskInstance taskInstance : taskInstanceList) { + try { + LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId()); + LOGGER.info("TaskInstance failover starting"); + if (!checkTaskInstanceNeedFailover(taskInstance)) { + LOGGER.info("The taskInstance doesn't need to failover"); + continue; + } + failoverTaskInstance(processInstance, taskInstance); + LOGGER.info("TaskInstance failover finished"); + } finally { + LoggerUtils.removeTaskInstanceIdMDC(); + } + } + + ProcessInstanceMetrics.incProcessInstanceFailover(); + //updateProcessInstance host is null to mark this processInstance has been failover + // and insert a failover command + processInstance.setHost(Constants.NULL); + processService.processNeedFailoverProcessInstances(processInstance); + LOGGER.info("WorkflowInstance failover finished"); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } + } + + failoverTimeCost.stop(); + LOGGER.info("Master[{}] failover finished, useTime:{}ms", + masterHost, + failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); + } + + private Optional getServerStartupTime(List servers, String host) { + if (CollectionUtils.isEmpty(servers)) { + return Optional.empty(); + } + Date serverStartupTime = null; + for (Server server : servers) { + if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { + serverStartupTime = server.getCreateTime(); + break; + } + } + return Optional.ofNullable(serverStartupTime); + } + + /** + * failover task instance + *

+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. try to notify local master + * + * @param processInstance + * @param taskInstance + */ + private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { + TaskMetrics.incTaskFailover(); + boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); + + taskInstance.setProcessInstance(processInstance); + + if (!isMasterTask) { + LOGGER.info("The failover taskInstance is not master task"); + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + + if (masterConfig.isKillYarnJobWhenTaskFailover()) { + // only kill yarn job if exists , the local thread has exited + LOGGER.info("TaskInstance failover begin kill the task related yarn job"); + ProcessUtils.killYarnJob(taskExecutionContext); + } + } else { + LOGGER.info("The failover taskInstance is a master task"); + } + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + taskInstance.setFlag(Flag.NO); + processService.saveTaskInstance(taskInstance); + } + + private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) { + if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { + // The task is already finished, so we don't need to failover this task instance + return false; + } + return true; + } + + private boolean checkProcessInstanceNeedFailover(Optional beFailoveredMasterStartupTimeOptional, + @NonNull ProcessInstance processInstance) { + // The process has already been failover, since when we do master failover we will hold a lock, so we can guarantee + // the host will not be set concurrent. + if (Constants.NULL.equals(processInstance.getHost())) { + return false; + } + if (!beFailoveredMasterStartupTimeOptional.isPresent()) { + // the master is not active, we can failover all it's processInstance + return true; + } + Date beFailoveredMasterStartupTime = beFailoveredMasterStartupTimeOptional.get(); + + if (processInstance.getStartTime().after(beFailoveredMasterStartupTime)) { + // The processInstance is newly created + return false; + } + + return true; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java new file mode 100644 index 0000000000..ec126a3ec3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.event.StateEvent; +import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import lombok.NonNull; + +@Service +public class WorkerFailoverService { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerFailoverService.class); + + private final RegistryClient registryClient; + private final MasterConfig masterConfig; + private final ProcessService processService; + private final WorkflowExecuteThreadPool workflowExecuteThreadPool; + private final ProcessInstanceExecCacheManager cacheManager; + private final String localAddress; + + public WorkerFailoverService(@NonNull RegistryClient registryClient, + @NonNull MasterConfig masterConfig, + @NonNull ProcessService processService, + @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, + @NonNull ProcessInstanceExecCacheManager cacheManager) { + this.registryClient = registryClient; + this.masterConfig = masterConfig; + this.processService = processService; + this.workflowExecuteThreadPool = workflowExecuteThreadPool; + this.cacheManager = cacheManager; + this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); + } + + /** + * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker, + * and failover these tasks. + *

+ * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master. + * + * @param workerHost worker host + */ + public void failoverWorker(@NonNull String workerHost) { + LOGGER.info("Worker[{}] failover starting", workerHost); + final StopWatch failoverTimeCost = StopWatch.createStarted(); + + // we query the task instance from cache, so that we can directly update the cache + final Optional needFailoverWorkerStartTime = + getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost); + + final List needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost); + if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) { + LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost); + return; + } + LOGGER.info( + "Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}", + workerHost, + needFailoverTaskInstanceList.size(), + needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList())); + final Map processInstanceCacheMap = new HashMap<>(); + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); + try { + ProcessInstance processInstance = + processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> { + WorkflowExecuteRunnable workflowExecuteRunnable = + cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()); + if (workflowExecuteRunnable == null) { + return null; + } + return workflowExecuteRunnable.getProcessInstance(); + }); + if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { + LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); + continue; + } + LOGGER.info( + "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE", + workerHost); + failoverTaskInstance(processInstance, taskInstance); + LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost); + } catch (Exception ex) { + LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } + failoverTimeCost.stop(); + LOGGER.info("Worker[{}] failover finished, useTime:{}ms", + workerHost, + failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); + } + + /** + * failover task instance + *

+ * 1. kill yarn job if run on worker and there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. try to notify local master + * + * @param processInstance + * @param taskInstance + */ + private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { + + TaskMetrics.incTaskFailover(); + boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); + + taskInstance.setProcessInstance(processInstance); + + if (!isMasterTask) { + LOGGER.info("The failover taskInstance is not master task"); + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + + if (masterConfig.isKillYarnJobWhenTaskFailover()) { + // only kill yarn job if exists , the local thread has exited + LOGGER.info("TaskInstance failover begin kill the task related yarn job"); + ProcessUtils.killYarnJob(taskExecutionContext); + } + } else { + LOGGER.info("The failover taskInstance is a master task"); + } + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + taskInstance.setFlag(Flag.NO); + processService.saveTaskInstance(taskInstance); + + StateEvent stateEvent = new StateEvent(); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(taskInstance.getState()); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + /** + * task needs failover if task start before server starts + * + * @return true if task instance need fail over + */ + private boolean checkTaskInstanceNeedFailover(Optional needFailoverWorkerStartTime, + @Nullable ProcessInstance processInstance, + TaskInstance taskInstance) { + if (processInstance == null) { + // This case should be happened. + LOGGER.error( + "Failover task instance error, cannot find the related processInstance form memory, this case shouldn't happened"); + return false; + } + if (taskInstance == null) { + // This case should be happened. + LOGGER.error("Master failover task instance error, taskInstance is null, this case shouldn't happened"); + return false; + } + // only failover the task owned myself if worker down. + if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) { + LOGGER.error( + "Master failover task instance error, the taskInstance's processInstance's host: {} is not the current master: {}", + processInstance.getHost(), + localAddress); + return false; + } + if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { + // The taskInstance is already finished, doesn't need to failover + LOGGER.info("The task is already finished, doesn't need to failover"); + return false; + } + if (!needFailoverWorkerStartTime.isPresent()) { + // The worker is still down + return true; + } + // The worker is active, may already send some new task to it + if (taskInstance.getSubmitTime() != null && taskInstance.getSubmitTime() + .after(needFailoverWorkerStartTime.get())) { + LOGGER.info( + "The taskInstance's submitTime: {} is after the need failover worker's start time: {}, the taskInstance is newly submit, it doesn't need to failover", + taskInstance.getSubmitTime(), + needFailoverWorkerStartTime.get()); + return false; + } + + return true; + } + + private List getNeedFailoverTaskInstance(@NonNull String failoverWorkerHost) { + // we query the task instance from cache, so that we can directly update the cache + return cacheManager.getAll() + .stream() + .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream()) + // If the worker is in dispatching and the host is not set + .filter(taskInstance -> failoverWorkerHost.equals(taskInstance.getHost()) + && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState())) + .collect(Collectors.toList()); + } + + private Optional getServerStartupTime(List servers, String host) { + if (CollectionUtils.isEmpty(servers)) { + return Optional.empty(); + } + Date serverStartupTime = null; + for (Server server : servers) { + if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { + serverStartupTime = server.getCreateTime(); + break; + } + } + return Optional.ofNullable(serverStartupTime); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java index b2c7d63cd7..55dd236f71 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java @@ -52,7 +52,7 @@ public class ExecutionContextTestUtils { TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context); Command command = requestCommand.convert2Command(); - ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance); executionContext.setHost(Host.of(NetUtils.getAddr(port))); return executionContext; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index fdd79552b1..1ee890faf2 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -70,7 +70,7 @@ public class NettyExecutorManagerTest { .buildProcessInstanceRelatedInfo(processInstance) .buildProcessDefinitionRelatedInfo(processDefinition) .create(); - ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance); executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort()))); Boolean execute = nettyExecutorManager.execute(executionContext); Assert.assertTrue(execute); @@ -89,7 +89,7 @@ public class NettyExecutorManagerTest { .buildProcessInstanceRelatedInfo(processInstance) .buildProcessDefinitionRelatedInfo(processDefinition) .create(); - ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance); executionContext.setHost(Host.of(NetUtils.getAddr(4444))); nettyExecutorManager.execute(executionContext); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 98bf514730..44e5a382f6 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -26,13 +26,14 @@ import static org.mockito.Mockito.doNothing; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -98,9 +99,17 @@ public class FailoverServiceTest { springApplicationContext.setApplicationContext(applicationContext); given(masterConfig.getListenPort()).willReturn(masterPort); - failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager); + MasterFailoverService masterFailoverService = + new MasterFailoverService(registryClient, masterConfig, processService); + WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, + masterConfig, + processService, + workflowExecuteThreadPool, + cacheManager); + + failoverService = new FailoverService(masterFailoverService, workerFailoverService); - testMasterHost = failoverService.getLocalAddress(); + testMasterHost = NetUtils.getAddr(masterConfig.getListenPort()); String ip = testMasterHost.split(":")[0]; int port = Integer.valueOf(testMasterHost.split(":")[1]); Assert.assertEquals(masterPort, port); @@ -118,6 +127,7 @@ public class FailoverServiceTest { processInstance = new ProcessInstance(); processInstance.setId(1); processInstance.setHost(testMasterHost); + processInstance.setStartTime(new Date()); processInstance.setRestartTime(new Date()); processInstance.setHistoryCmd("xxx"); processInstance.setCommandType(CommandType.STOP); @@ -154,16 +164,10 @@ public class FailoverServiceTest { given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer))); given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer))); - ReflectionTestUtils.setField(failoverService, "registryClient", registryClient); doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class)); } - @Test - public void checkMasterFailoverTest() { - failoverService.checkMasterFailover(); - } - @Test public void failoverMasterTest() { processInstance.setHost(Constants.NULL); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index f6eab1befa..fd33eb115a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; @@ -62,7 +63,8 @@ import org.springframework.transaction.annotation.Transactional; public interface ProcessService { @Transactional - ProcessInstance handleCommand(String host, Command command) throws CronParseException; + ProcessInstance handleCommand(String host, Command command) + throws CronParseException, CodeGenerateUtils.CodeGenerateException; void moveToErrorCommand(Command command, String message); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index f9d0782f95..5024a944f1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -128,7 +128,6 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.ServiceException; @@ -276,6 +275,9 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private CuringParamsService curingGlobalParamsService; + @Autowired + private ProcessService processService; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -285,7 +287,8 @@ public class ProcessServiceImpl implements ProcessService { */ @Override @Transactional - public ProcessInstance handleCommand(String host, Command command) throws CronParseException { + public ProcessInstance handleCommand(String host, Command command) throws CronParseException, + CodeGenerateException { ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { @@ -762,7 +765,7 @@ public class ProcessServiceImpl implements ProcessService { */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, Command command, - Map cmdParam) { + Map cmdParam) throws CodeGenerateException { ProcessInstance processInstance = new ProcessInstance(processDefinition); processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setProcessDefinitionVersion(processDefinition.getVersion()); @@ -922,7 +925,8 @@ public class ProcessServiceImpl implements ProcessService { * @param host host * @return process instance */ - protected ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException { + protected ProcessInstance constructProcessInstance(Command command, String host) + throws CronParseException, CodeGenerateException { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); @@ -1028,6 +1032,7 @@ public class ProcessServiceImpl implements ProcessService { case RECOVER_TOLERANCE_FAULT_PROCESS: // recover tolerance fault process processInstance.setRecovery(Flag.YES); + processInstance.setRunTimes(runTime + 1); runStatus = processInstance.getState(); break; case COMPLEMENT_DATA: @@ -1273,11 +1278,15 @@ public class ProcessServiceImpl implements ProcessService { while (retryTimes <= commitRetryTimes) { try { // submit task to db - task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance); + // Only want to use transaction here + task = processService.submitTask(processInstance, taskInstance); if (task != null && task.getId() != 0) { break; } - logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); + logger.error( + "task commit to db failed , taskId {} has already retry {} times, please check the database", + taskInstance.getId(), + retryTimes); Thread.sleep(commitInterval); } catch (Exception e) { logger.error("task commit to db failed", e); @@ -1299,13 +1308,17 @@ public class ProcessServiceImpl implements ProcessService { @Override @Transactional public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { - logger.info("start submit task : {}, processInstance id:{}, state: {}", - taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); + logger.info("Start save taskInstance to database : {}, processInstance id:{}, state: {}", + taskInstance.getName(), + taskInstance.getProcessInstanceId(), + processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { - logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ", + taskInstance.getName(), + taskInstance.getProcessInstance(), + processInstance.getState()); return null; } @@ -1313,8 +1326,13 @@ public class ProcessServiceImpl implements ProcessService { createSubWorkProcess(processInstance, task); } - logger.info("end submit task to db successfully:{} {} state:{} complete, instance id:{} state: {} ", - taskInstance.getId(), taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); + logger.info( + "End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}", + taskInstance.getId(), + taskInstance.getName(), + task.getState(), + processInstance.getId(), + processInstance.getState()); return task; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java index 0aeec4609d..823ec81d3c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java @@ -211,16 +211,45 @@ public class TaskPriority implements Comparable { } TaskPriority that = (TaskPriority) o; return processInstancePriority == that.processInstancePriority - && processInstanceId == that.processInstanceId - && taskInstancePriority == that.taskInstancePriority - && taskId == that.taskId - && taskGroupPriority == that.taskGroupPriority - && Objects.equals(groupName, that.groupName); + && processInstanceId == that.processInstanceId + && taskInstancePriority == that.taskInstancePriority + && taskId == that.taskId + && taskGroupPriority == that.taskGroupPriority + && Objects.equals(groupName, that.groupName); } @Override public int hashCode() { - return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName); + return Objects.hash(processInstancePriority, + processInstanceId, + taskInstancePriority, + taskId, + taskGroupPriority, + groupName); } + @Override + public String toString() { + return "TaskPriority{" + + "processInstancePriority=" + + processInstancePriority + + ", processInstanceId=" + + processInstanceId + + ", taskInstancePriority=" + + taskInstancePriority + + ", taskId=" + + taskId + + ", taskExecutionContext=" + + taskExecutionContext + + ", groupName='" + + groupName + + '\'' + + ", context=" + + context + + ", checkpoint=" + + checkpoint + + ", taskGroupPriority=" + + taskGroupPriority + + '}'; + } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 26b9e66045..f655e3b2af 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; @@ -285,7 +286,7 @@ public class ProcessServiceTest { } @Test - public void testHandleCommand() throws CronParseException { + public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { //cannot construct process instance, return null; String host = "127.0.0.1"; @@ -462,7 +463,7 @@ public class ProcessServiceTest { } @Test(expected = ServiceException.class) - public void testDeleteNotExistCommand() throws CronParseException { + public void testDeleteNotExistCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { String host = "127.0.0.1"; int definitionVersion = 1; long definitionCode = 123;