Browse Source

[Fix][Registry]server listener err (#6232)

* [Fix][Registry]server listener err
2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
fa0b86ac82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  2. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -106,6 +106,11 @@ public class ServerNodeManager implements InitializingBean {
*/
private RegistryClient registryClient = RegistryClient.getInstance();
/**
* eg : /node/worker/group/127.0.0.1:xxx
*/
private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
/**
* worker group mapper
*/
@ -155,7 +160,7 @@ public class ServerNodeManager implements InitializingBean {
/**
* init WorkerNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new MasterDataListener());
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
}
/**
@ -210,7 +215,7 @@ public class ServerNodeManager implements InitializingBean {
/**
* worker group node listener
*/
class WorkerGroupNodeListener implements SubscribeListener {
class WorkerDataListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
@ -240,7 +245,7 @@ public class ServerNodeManager implements InitializingBean {
private String parseGroup(String path) {
String[] parts = path.split("/");
if (parts.length < 6) {
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
return parts[parts.length - 2];
@ -306,8 +311,7 @@ public class ServerNodeManager implements InitializingBean {
/**
* sync master nodes
*
* @param nodes master nodes
* @param masterNodes
* @param nodes master nodes
*/
private void syncMasterNodes(Set<String> nodes, List<Server> masterNodes) {
masterLock.lock();
@ -332,7 +336,7 @@ public class ServerNodeManager implements InitializingBean {
* sync worker group nodes
*
* @param workerGroup worker group
* @param nodes worker nodes
* @param nodes worker nodes
*/
private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
workerGroupLock.lock();

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -131,11 +131,10 @@ public class WorkerRegistryClient {
public Set<String> getWorkerZkPaths() {
Set<String> workerPaths = Sets.newHashSet();
String address = getLocalAddress();
String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS;
for (String workGroup : this.workerGroups) {
StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH);
workerPathJoiner.add(workerZkPathPrefix);
workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS);
if (StringUtils.isEmpty(workGroup)) {
workGroup = DEFAULT_WORKER_GROUP;
}

Loading…
Cancel
Save