Browse Source

[Fix-13913][Master] syncMasterNodes does not make current slot correctly after zookeeper reconnect (#14014)

3.0.6-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
57fff8c686
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
  2. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java

@ -32,10 +32,12 @@ public class MasterConnectionStateListener implements ConnectionListener {
private final String masterNodePath;
private final RegistryClient registryClient;
private final MasterHeartBeatTask masterHeartBeatTask;
public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient) {
public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient, MasterHeartBeatTask masterHeartBeatTask) {
this.masterNodePath = checkNotNull(masterNodePath);
this.registryClient = checkNotNull(registryClient);
this.masterHeartBeatTask = checkNotNull(masterHeartBeatTask);
}
@Override
@ -50,7 +52,7 @@ public class MasterConnectionStateListener implements ConnectionListener {
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.remove(masterNodePath);
registryClient.persistEphemeral(masterNodePath, "");
registryClient.persistEphemeral(masterNodePath, masterHeartBeatTask.getHeartBeatInfo());
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -79,6 +79,11 @@ public class MasterRegistryClient implements AutoCloseable {
*/
private ScheduledExecutorService heartBeatExecutor;
/**
* master heartbeat task
*/
private MasterHeartBeatTask masterHeartBeatTask;
/**
* master startup time, ms
*/
@ -96,7 +101,7 @@ public class MasterRegistryClient implements AutoCloseable {
// master registry
registry();
registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(),
registryClient));
registryClient, masterHeartBeatTask));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up error", e);
@ -189,7 +194,7 @@ public class MasterRegistryClient implements AutoCloseable {
logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
masterHeartBeatTask = new MasterHeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
@ -197,7 +202,7 @@ public class MasterRegistryClient implements AutoCloseable {
// remove before persist
registryClient.remove(localNodePath);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.persistEphemeral(localNodePath, masterHeartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
@ -210,7 +215,7 @@ public class MasterRegistryClient implements AutoCloseable {
// delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
this.heartBeatExecutor.scheduleAtFixedRate(masterHeartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
}

Loading…
Cancel
Save