From aa51c66d9196997bfdfd6cb42be07311652fa0e9 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Tue, 17 May 2022 16:36:24 +0800 Subject: [PATCH] [Bug][Master] fix master task failover (#10065) * fix master task failover * ui (cherry picked from commit 0cc0ee77faabf1bed495a55ccd7653c03c26a550) --- .../runner/task/TaskProcessorFactory.java | 9 ++ .../master/service/FailoverService.java | 63 ++++++----- .../master/service/FailoverServiceTest.java | 103 ++++++++++++------ 3 files changed, 114 insertions(+), 61 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 2b9e9d644a..0129338649 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -52,4 +52,13 @@ public class TaskProcessorFactory { return iTaskProcessor.getClass().newInstance(); } + + /** + * if match master processor, then this task type is processed on the master + * @param type + * @return + */ + public static boolean isMasterTask(String type) { + return PROCESS_MAP.containsKey(type); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 4f25c78dad..2557b659e0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -127,7 +128,11 @@ public class FailoverService { long startTime = System.currentTimeMillis(); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); - List workerServers = registryClient.getServerList(NodeType.WORKER); + + // servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail. + List servers = registryClient.getServerList(NodeType.WORKER); + servers.addAll(registryClient.getServerList(NodeType.MASTER)); + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { if (Constants.NULL.equals(processInstance.getHost())) { continue; @@ -136,7 +141,7 @@ public class FailoverService { List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance taskInstance : validTaskInstanceList) { LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance, workerServers); + failoverTaskInstance(processInstance, taskInstance, servers); } if (serverStartupTime != null && processInstance.getRestartTime() != null @@ -198,29 +203,37 @@ public class FailoverService { /** * failover task instance *

- * 1. kill yarn job if there are yarn jobs in tasks. + * 1. kill yarn job if run on worker and there are yarn jobs in tasks. * 2. change task state from running to need failover. * 3. try to notify local master + * @param processInstance + * @param taskInstance + * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers. */ - private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List workerServers) { + private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List servers) { if (processInstance == null) { LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", taskInstance.getProcessInstanceId(), taskInstance.getId()); return; } - if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + if (!checkTaskInstanceNeedFailover(servers, taskInstance)) { return; } + boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType()); + taskInstance.setProcessInstance(processInstance); - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - - if (masterConfig.isKillYarnJobWhenTaskFailover()) { - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); + + if (!isMasterTask) { + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + + if (masterConfig.isKillYarnJobWhenTaskFailover()) { + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + } } taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); @@ -256,13 +269,13 @@ public class FailoverService { } /** - * task needs failover if task start before worker starts + * task needs failover if task start before server starts * - * @param workerServers worker servers + * @param servers servers, can container master servers or worker servers * @param taskInstance task instance * @return true if task instance need fail over */ - private boolean checkTaskInstanceNeedFailover(List workerServers, TaskInstance taskInstance) { + private boolean checkTaskInstanceNeedFailover(List servers, TaskInstance taskInstance) { boolean taskNeedFailover = true; @@ -279,14 +292,13 @@ public class FailoverService { return false; } - //now no host will execute this task instance,so no need to failover the task if (taskInstance.getHost() == null) { return false; } - //if task start after worker starts, there is no need to failover the task. - if (checkTaskAfterWorkerStart(workerServers, taskInstance)) { + //if task start after server starts, there is no need to failover the task. + if (checkTaskAfterServerStart(servers, taskInstance)) { taskNeedFailover = false; } @@ -296,19 +308,20 @@ public class FailoverService { /** * check task start after the worker server starts. * + * @param servers servers, can contain master servers or worker servers * @param taskInstance task instance - * @return true if task instance start time after worker server start date + * @return true if task instance start time after server start date */ - private boolean checkTaskAfterWorkerStart(List workerServers, TaskInstance taskInstance) { + private boolean checkTaskAfterServerStart(List servers, TaskInstance taskInstance) { if (StringUtils.isEmpty(taskInstance.getHost())) { return false; } - Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost()); - if (workerServerStartDate != null) { + Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost()); + if (serverStartDate != null) { if (taskInstance.getStartTime() == null) { - return taskInstance.getSubmitTime().after(workerServerStartDate); + return taskInstance.getSubmitTime().after(serverStartDate); } else { - return taskInstance.getStartTime().after(workerServerStartDate); + return taskInstance.getStartTime().after(serverStartDate); } } return false; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 17a3798090..4e3d99347a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.server.master.service; +import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; @@ -30,9 +33,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -46,6 +51,7 @@ import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.context.ApplicationContext; import org.springframework.test.util.ReflectionTestUtils; import com.google.common.collect.Lists; @@ -72,22 +78,34 @@ public class FailoverServiceTest { @Mock private WorkflowExecuteThreadPool workflowExecuteThreadPool; - private String testHost; + private static int masterPort = 5678; + private static int workerPort = 1234; + + private String testMasterHost; + private String testWorkerHost; private ProcessInstance processInstance; - private TaskInstance taskInstance; + private TaskInstance masterTaskInstance; + private TaskInstance workerTaskInstance; @Before public void before() throws Exception { - given(masterConfig.getListenPort()).willReturn(8080); + // init spring context + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + + given(masterConfig.getListenPort()).willReturn(masterPort); - testHost = failoverService.getLocalAddress(); - String ip = testHost.split(":")[0]; - int port = Integer.valueOf(testHost.split(":")[1]); - Assert.assertEquals(8080, port); + testMasterHost = failoverService.getLocalAddress(); + String ip = testMasterHost.split(":")[0]; + int port = Integer.valueOf(testMasterHost.split(":")[1]); + Assert.assertEquals(masterPort, port); + + testWorkerHost = ip + ":" + workerPort; given(registryClient.getLock(Mockito.anyString())).willReturn(true); given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); - given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost); + given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testMasterHost); given(registryClient.getStoppable()).willReturn(cause -> { }); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true); @@ -95,30 +113,43 @@ public class FailoverServiceTest { processInstance = new ProcessInstance(); processInstance.setId(1); - processInstance.setHost(testHost); + processInstance.setHost(testMasterHost); processInstance.setRestartTime(new Date()); processInstance.setHistoryCmd("xxx"); processInstance.setCommandType(CommandType.STOP); - taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setStartTime(new Date()); - taskInstance.setHost(testHost); + masterTaskInstance = new TaskInstance(); + masterTaskInstance.setId(1); + masterTaskInstance.setStartTime(new Date()); + masterTaskInstance.setHost(testMasterHost); + masterTaskInstance.setTaskType(TASK_TYPE_SWITCH); + + workerTaskInstance = new TaskInstance(); + workerTaskInstance.setId(2); + workerTaskInstance.setStartTime(new Date()); + workerTaskInstance.setHost(testWorkerHost); + workerTaskInstance.setTaskType(COMMON_TASK_TYPE); - given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance)); - given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost)); + given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(masterTaskInstance, workerTaskInstance)); + given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testMasterHost)); given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance)); doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); - given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance)); + given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); Thread.sleep(1000); - Server server = new Server(); - server.setHost(ip); - server.setPort(port); - server.setCreateTime(new Date()); - given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server)); - given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server)); + Server masterServer = new Server(); + masterServer.setHost(ip); + masterServer.setPort(masterPort); + masterServer.setCreateTime(new Date()); + + Server workerServer = new Server(); + workerServer.setHost(ip); + workerServer.setPort(workerPort); + workerServer.setCreateTime(new Date()); + + given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer))); + given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer))); ReflectionTestUtils.setField(failoverService, "registryClient", registryClient); doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class)); @@ -132,26 +163,26 @@ public class FailoverServiceTest { @Test public void failoverMasterTest() { processInstance.setHost(Constants.NULL); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); - Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); - - processInstance.setHost(testHost); - taskInstance.setState(ExecutionStatus.SUCCESS); - failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); - Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + + processInstance.setHost(testMasterHost); + masterTaskInstance.setState(ExecutionStatus.SUCCESS); + failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); Assert.assertEquals(Constants.NULL, processInstance.getHost()); - processInstance.setHost(testHost); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); - Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + processInstance.setHost(testMasterHost); + masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + Assert.assertEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); Assert.assertEquals(Constants.NULL, processInstance.getHost()); } @Test public void failoverWorkTest() { - failoverService.failoverServerWhenDown(testHost, NodeType.WORKER); - Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER); + Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); } }