|
|
|
@ -17,6 +17,9 @@
|
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.server.master.service; |
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; |
|
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
|
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; |
|
|
|
|
import static org.mockito.BDDMockito.given; |
|
|
|
|
import static org.mockito.Mockito.doNothing; |
|
|
|
|
|
|
|
|
@ -30,9 +33,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
|
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
|
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
|
import org.apache.dolphinscheduler.service.registry.RegistryClient; |
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Arrays; |
|
|
|
|
import java.util.Date; |
|
|
|
|
|
|
|
|
@ -46,6 +51,7 @@ import org.mockito.Mockito;
|
|
|
|
|
import org.powermock.core.classloader.annotations.PowerMockIgnore; |
|
|
|
|
import org.powermock.core.classloader.annotations.PrepareForTest; |
|
|
|
|
import org.powermock.modules.junit4.PowerMockRunner; |
|
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
|
import org.springframework.test.util.ReflectionTestUtils; |
|
|
|
|
|
|
|
|
|
import com.google.common.collect.Lists; |
|
|
|
@ -72,22 +78,34 @@ public class FailoverServiceTest {
|
|
|
|
|
@Mock |
|
|
|
|
private WorkflowExecuteThreadPool workflowExecuteThreadPool; |
|
|
|
|
|
|
|
|
|
private String testHost; |
|
|
|
|
private static int masterPort = 5678; |
|
|
|
|
private static int workerPort = 1234; |
|
|
|
|
|
|
|
|
|
private String testMasterHost; |
|
|
|
|
private String testWorkerHost; |
|
|
|
|
private ProcessInstance processInstance; |
|
|
|
|
private TaskInstance taskInstance; |
|
|
|
|
private TaskInstance masterTaskInstance; |
|
|
|
|
private TaskInstance workerTaskInstance; |
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
|
public void before() throws Exception { |
|
|
|
|
given(masterConfig.getListenPort()).willReturn(8080); |
|
|
|
|
// init spring context
|
|
|
|
|
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); |
|
|
|
|
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); |
|
|
|
|
springApplicationContext.setApplicationContext(applicationContext); |
|
|
|
|
|
|
|
|
|
given(masterConfig.getListenPort()).willReturn(masterPort); |
|
|
|
|
|
|
|
|
|
testHost = failoverService.getLocalAddress(); |
|
|
|
|
String ip = testHost.split(":")[0]; |
|
|
|
|
int port = Integer.valueOf(testHost.split(":")[1]); |
|
|
|
|
Assert.assertEquals(8080, port); |
|
|
|
|
testMasterHost = failoverService.getLocalAddress(); |
|
|
|
|
String ip = testMasterHost.split(":")[0]; |
|
|
|
|
int port = Integer.valueOf(testMasterHost.split(":")[1]); |
|
|
|
|
Assert.assertEquals(masterPort, port); |
|
|
|
|
|
|
|
|
|
testWorkerHost = ip + ":" + workerPort; |
|
|
|
|
|
|
|
|
|
given(registryClient.getLock(Mockito.anyString())).willReturn(true); |
|
|
|
|
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); |
|
|
|
|
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost); |
|
|
|
|
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testMasterHost); |
|
|
|
|
given(registryClient.getStoppable()).willReturn(cause -> { |
|
|
|
|
}); |
|
|
|
|
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true); |
|
|
|
@ -95,30 +113,43 @@ public class FailoverServiceTest {
|
|
|
|
|
|
|
|
|
|
processInstance = new ProcessInstance(); |
|
|
|
|
processInstance.setId(1); |
|
|
|
|
processInstance.setHost(testHost); |
|
|
|
|
processInstance.setHost(testMasterHost); |
|
|
|
|
processInstance.setRestartTime(new Date()); |
|
|
|
|
processInstance.setHistoryCmd("xxx"); |
|
|
|
|
processInstance.setCommandType(CommandType.STOP); |
|
|
|
|
|
|
|
|
|
taskInstance = new TaskInstance(); |
|
|
|
|
taskInstance.setId(1); |
|
|
|
|
taskInstance.setStartTime(new Date()); |
|
|
|
|
taskInstance.setHost(testHost); |
|
|
|
|
masterTaskInstance = new TaskInstance(); |
|
|
|
|
masterTaskInstance.setId(1); |
|
|
|
|
masterTaskInstance.setStartTime(new Date()); |
|
|
|
|
masterTaskInstance.setHost(testMasterHost); |
|
|
|
|
masterTaskInstance.setTaskType(TASK_TYPE_SWITCH); |
|
|
|
|
|
|
|
|
|
workerTaskInstance = new TaskInstance(); |
|
|
|
|
workerTaskInstance.setId(2); |
|
|
|
|
workerTaskInstance.setStartTime(new Date()); |
|
|
|
|
workerTaskInstance.setHost(testWorkerHost); |
|
|
|
|
workerTaskInstance.setTaskType(COMMON_TASK_TYPE); |
|
|
|
|
|
|
|
|
|
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance)); |
|
|
|
|
given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost)); |
|
|
|
|
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(masterTaskInstance, workerTaskInstance)); |
|
|
|
|
given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testMasterHost)); |
|
|
|
|
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance)); |
|
|
|
|
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); |
|
|
|
|
given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance)); |
|
|
|
|
given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); |
|
|
|
|
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); |
|
|
|
|
|
|
|
|
|
Thread.sleep(1000); |
|
|
|
|
Server server = new Server(); |
|
|
|
|
server.setHost(ip); |
|
|
|
|
server.setPort(port); |
|
|
|
|
server.setCreateTime(new Date()); |
|
|
|
|
given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server)); |
|
|
|
|
given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server)); |
|
|
|
|
Server masterServer = new Server(); |
|
|
|
|
masterServer.setHost(ip); |
|
|
|
|
masterServer.setPort(masterPort); |
|
|
|
|
masterServer.setCreateTime(new Date()); |
|
|
|
|
|
|
|
|
|
Server workerServer = new Server(); |
|
|
|
|
workerServer.setHost(ip); |
|
|
|
|
workerServer.setPort(workerPort); |
|
|
|
|
workerServer.setCreateTime(new Date()); |
|
|
|
|
|
|
|
|
|
given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer))); |
|
|
|
|
given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer))); |
|
|
|
|
ReflectionTestUtils.setField(failoverService, "registryClient", registryClient); |
|
|
|
|
|
|
|
|
|
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class)); |
|
|
|
@ -132,26 +163,26 @@ public class FailoverServiceTest {
|
|
|
|
|
@Test |
|
|
|
|
public void failoverMasterTest() { |
|
|
|
|
processInstance.setHost(Constants.NULL); |
|
|
|
|
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
|
|
|
|
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); |
|
|
|
|
Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
|
|
|
|
|
processInstance.setHost(testHost); |
|
|
|
|
taskInstance.setState(ExecutionStatus.SUCCESS); |
|
|
|
|
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); |
|
|
|
|
Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
|
|
|
|
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); |
|
|
|
|
Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
|
|
|
|
|
processInstance.setHost(testMasterHost); |
|
|
|
|
masterTaskInstance.setState(ExecutionStatus.SUCCESS); |
|
|
|
|
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); |
|
|
|
|
Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
Assert.assertEquals(Constants.NULL, processInstance.getHost()); |
|
|
|
|
|
|
|
|
|
processInstance.setHost(testHost); |
|
|
|
|
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
|
|
|
|
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER); |
|
|
|
|
Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
processInstance.setHost(testMasterHost); |
|
|
|
|
masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
|
|
|
|
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); |
|
|
|
|
Assert.assertEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
Assert.assertEquals(Constants.NULL, processInstance.getHost()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void failoverWorkTest() { |
|
|
|
|
failoverService.failoverServerWhenDown(testHost, NodeType.WORKER); |
|
|
|
|
Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER); |
|
|
|
|
Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|