diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index cbfb329279..09f4cc2433 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -106,6 +106,11 @@ public class ServerNodeManager implements InitializingBean { */ private RegistryClient registryClient = RegistryClient.getInstance(); + /** + * eg : /node/worker/group/127.0.0.1:xxx + */ + private static final int WORKER_LISTENER_CHECK_LENGTH = 5; + /** * worker group mapper */ @@ -155,7 +160,7 @@ public class ServerNodeManager implements InitializingBean { /** * init WorkerNodeListener listener */ - registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new MasterDataListener()); + registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); } /** @@ -210,7 +215,7 @@ public class ServerNodeManager implements InitializingBean { /** * worker group node listener */ - class WorkerGroupNodeListener implements SubscribeListener { + class WorkerDataListener implements SubscribeListener { @Override public void notify(String path, DataChangeEvent dataChangeEvent) { @@ -240,7 +245,7 @@ public class ServerNodeManager implements InitializingBean { private String parseGroup(String path) { String[] parts = path.split("/"); - if (parts.length < 6) { + if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); } return parts[parts.length - 2]; @@ -306,8 +311,7 @@ public class ServerNodeManager implements InitializingBean { /** * sync master nodes * - * @param nodes master nodes - * @param masterNodes + * @param nodes master nodes */ private void syncMasterNodes(Set nodes, List masterNodes) { masterLock.lock(); @@ -332,7 +336,7 @@ public class ServerNodeManager implements InitializingBean { * sync worker group nodes * * @param workerGroup worker group - * @param nodes worker nodes + * @param nodes worker nodes */ private void syncWorkerGroupNodes(String workerGroup, Set nodes) { workerGroupLock.lock(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 9c8047358c..363b497278 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -131,11 +131,10 @@ public class WorkerRegistryClient { public Set getWorkerZkPaths() { Set workerPaths = Sets.newHashSet(); String address = getLocalAddress(); - String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS; for (String workGroup : this.workerGroups) { StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH); - workerPathJoiner.add(workerZkPathPrefix); + workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS); if (StringUtils.isEmpty(workGroup)) { workGroup = DEFAULT_WORKER_GROUP; }