diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index ee6349afb3..318a06fc6f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -290,20 +290,20 @@ public class MasterRegistryClient { ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if (workerHost == null - || !checkOwner - || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + || !checkOwner + || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { // only failover the task owned myself if worker down. if (processInstance == null) { logger.error("failover error, the process {} of task {} do not exists.", - taskInstance.getProcessInstanceId(), taskInstance.getId()); + taskInstance.getProcessInstanceId(), taskInstance.getId()); continue; } taskInstance.setProcessInstance(processInstance); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskExecutionContext); @@ -364,16 +364,33 @@ public class MasterRegistryClient { registryClient); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); - registryClient.addConnectionStateListener(newState -> { - if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) { - registryClient.persistEphemeral(localNodePath, ""); - } - }); + registryClient.addConnectionStateListener(this::handleConnectionState); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } + public void handleConnectionState(ConnectionState state) { + switch (state) { + case CONNECTED: + logger.debug("registry connection state is {}", state); + break; + case SUSPENDED: + logger.warn("registry connection state is {}, ready to stop myself", state); + registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself"); + break; + case RECONNECTED: + logger.debug("registry connection state is {}, clean the node info", state); + registryClient.persistEphemeral(localNodePath, ""); + break; + case DISCONNECTED: + logger.warn("registry connection state is {}, ready to stop myself", state); + registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself"); + break; + default: + } + } + public void deregister() { try { String address = getLocalAddress(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index c643818328..65d6b89033 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -48,7 +49,7 @@ import org.springframework.test.util.ReflectionTestUtils; * MasterRegistryClientTest */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ RegistryClient.class }) +@PrepareForTest({RegistryClient.class}) @PowerMockIgnore({"javax.management.*"}) public class MasterRegistryClientTest { @@ -72,6 +73,9 @@ public class MasterRegistryClientTest { given(registryClient.getLock(Mockito.anyString())).willReturn(true); given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080"); + given(registryClient.getStoppable()).willReturn(cause -> { + + }); doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString()); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); @@ -101,6 +105,13 @@ public class MasterRegistryClientTest { masterRegistryClient.registry(); } + @Test + public void handleConnectionStateTest() { + masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED); + masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED); + masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED); + } + @Test public void removeNodePathTest() { masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);