Browse Source

[Improvement][Server] Must restart master if Zk reconnect (#5210) (#5211)

pull/3/MERGE
ruanwenjun 4 years ago committed by GitHub
parent
commit
842c5400e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  3. 12
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -90,10 +90,8 @@ public class MasterRegistry {
logger.error("master : {} connection lost from zookeeper", address); logger.error("master : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) { } else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper", address); logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address); logger.warn("master : {} connection SUSPENDED ", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} }
}); });
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -111,10 +111,8 @@ public class WorkerRegistry {
logger.error("worker : {} connection lost from zookeeper", address); logger.error("worker : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) { } else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper", address); logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address); logger.warn("worker : {} connection SUSPENDED ", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} }
}); });
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);

12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -189,18 +189,20 @@ public class ZookeeperOperator implements InitializingBean {
} }
} }
public void persistEphemeral(final String key, final String value) { public void persistEphemeral(final String path, final String value) {
try { try {
if (isExisted(key)) { // If the ephemeral node exist and the data is not equals to the given value
// delete the old node
if (isExisted(path) && !value.equals(get(path))) {
try { try {
zkClient.delete().deletingChildrenIfNeeded().forPath(key); zkClient.delete().deletingChildrenIfNeeded().forPath(path);
} catch (NoNodeException ignore) { } catch (NoNodeException ignore) {
//NOP //NOP
} }
} }
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) { } catch (final Exception ex) {
logger.error("persistEphemeral key : {} , value : {}", key, value, ex); logger.error("persistEphemeral path : {} , value : {}", path, value, ex);
} }
} }

Loading…
Cancel
Save