diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 7fd373166f..7619535e15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -48,7 +48,9 @@ import org.apache.commons.lang.StringUtils; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -194,7 +196,7 @@ public class MasterRegistryClient { failoverMaster(serverHost); break; case WORKER: - failoverWorker(serverHost, true, true); + failoverWorker(serverHost); break; default: break; @@ -275,79 +277,114 @@ public class MasterRegistryClient { * 3. failover all tasks when workerHost is null * * @param workerHost worker host - * @param needCheckWorkerAlive need check worker alive - * @param checkOwner need check process instance owner */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boolean checkOwner) { - logger.info("start worker[{}] failover ...", workerHost); + private void failoverWorker(String workerHost) { + if (StringUtils.isEmpty(workerHost)) { + return; + } + + long startTime = System.currentTimeMillis(); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - for (TaskInstance taskInstance : needFailoverTaskInstanceList) { - if (needCheckWorkerAlive) { - if (!checkTaskInstanceNeedFailover(taskInstance)) { - continue; - } - } + Map processInstanceCacheMap = new HashMap<>(); + logger.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); - ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if (workerHost == null - || !checkOwner - || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { - // only failover the task owned myself if worker down. + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if (processInstance == null) { - logger.error("failover error, the process {} of task {} do not exists.", + logger.error("failover task instance error, processInstance {} of taskInstance {} is null", taskInstance.getProcessInstanceId(), taskInstance.getId()); continue; } - taskInstance.setProcessInstance(processInstance); - - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - - if (!processInstanceExecCacheManager.contains(processInstance.getId())) { - continue; - } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); - StateEvent stateEvent = new StateEvent(); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(processInstance.getId()); - stateEvent.setExecutionStatus(taskInstance.getState()); - workflowExecuteThreadNotify.addStateEvent(stateEvent); + processInstanceCacheMap.put(processInstance.getId(), processInstance); } + // only failover the task owned myself if worker down. + if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance); + } } - logger.info("end worker[{}] failover ...", workerHost); + logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); } /** - * failover master tasks + * failover master + *

+ * failover process instance and associated task instance * * @param masterHost master host */ private void failoverMaster(String masterHost) { - logger.info("start master failover ..."); + if (StringUtils.isEmpty(masterHost)) { + return; + } + long startTime = System.currentTimeMillis(); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); - logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size()); - //updateProcessInstance host is null and insert into command for (ProcessInstance processInstance : needFailoverProcessInstanceList) { - logger.info("failover process instance id: {} host:{}", processInstance.getId(), processInstance.getHost()); if (Constants.NULL.equals(processInstance.getHost())) { continue; } + + logger.info("failover process instance id: {}", processInstance.getId()); + + List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance taskInstance : validTaskInstanceList) { + if (Constants.NULL.equals(taskInstance.getHost())) { + continue; + } + logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance); + } + //updateProcessInstance host is null and insert into command processService.processNeedFailoverProcessInstances(processInstance); } - failoverWorker(masterHost, true, false); - logger.info("master failover end"); + logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); + } + + private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) { + if (taskInstance == null) { + logger.error("failover task instance error, taskInstance is null"); + return; + } + + if (processInstance == null) { + logger.error("failover task instance error, processInstance {} of taskInstance {} is null", + taskInstance.getProcessInstanceId(), taskInstance.getId()); + return; + } + + if (!checkTaskInstanceNeedFailover(taskInstance)) { + return; + } + + taskInstance.setProcessInstance(processInstance); + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + + WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); + if (workflowExecuteThreadNotify == null) { + return; + } + StateEvent stateEvent = new StateEvent(); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(taskInstance.getState()); + workflowExecuteThreadNotify.addStateEvent(stateEvent); } /**