Browse Source

[cherry-pick][DS-7004][MasterServer]fix master still work when it lose zk connection (#7058)

2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
19cb1237b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  2. 13
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -290,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
|| !checkOwner
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
|| !checkOwner
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.",
taskInstance.getProcessInstanceId(), taskInstance.getId());
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@ -364,16 +364,33 @@ public class MasterRegistryClient {
registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(newState -> {
if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
registryClient.persistEphemeral(localNodePath, "");
}
});
registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
public void handleConnectionState(ConnectionState state) {
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
break;
case SUSPENDED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself");
break;
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.persistEphemeral(localNodePath, "");
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
break;
default:
}
}
public void deregister() {
try {
String address = getLocalAddress();

13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -48,7 +49,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* MasterRegistryClientTest
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@ -72,6 +73,9 @@ public class MasterRegistryClientTest {
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
given(registryClient.getStoppable()).willReturn(cause -> {
});
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
@ -101,6 +105,13 @@ public class MasterRegistryClientTest {
masterRegistryClient.registry();
}
@Test
public void handleConnectionStateTest() {
masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
}
@Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);

Loading…
Cancel
Save