diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java index cf4980147e..daec765315 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java @@ -16,8 +16,10 @@ */ package org.apache.dolphinscheduler.common.zk; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,30 +36,37 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); - //kay is zk path, value is TreeCache - private ConcurrentHashMap allCaches = new ConcurrentHashMap<>(); + TreeCache treeCache; /** - * @param cachePath zk path - * @param listener operator + * register a unified listener of /${dsRoot}, */ - public void registerListener(final String cachePath, final TreeCacheListener listener) { - TreeCache newCache = new TreeCache(zkClient, cachePath); - logger.info("add listener to zk path: {}", cachePath); + @Override + protected void registerListener() { + treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot()); + logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); try { - newCache.start(); + treeCache.start(); } catch (Exception e) { - logger.error("add listener to zk path: {} failed", cachePath); + logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); throw new RuntimeException(e); } - newCache.getListenable().addListener(listener); + treeCache.getListenable().addListener((client, event) -> { + String path = null == event.getData() ? "" : event.getData().getPath(); + if (path.isEmpty()) { + return; + } + dataChanged(client, event, path); + }); - allCaches.put(cachePath, newCache); } + //for sub class + protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){} + public String getFromCache(final String cachePath, final String key) { - ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key); + ChildData resultInCache = treeCache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); } @@ -65,18 +74,15 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { } public TreeCache getTreeCache(final String cachePath) { - return allCaches.get(checkNotNull(cachePath)); + return treeCache; } public void close() { - - allCaches.forEach((path, cache) -> { - cache.close(); - try { - Thread.sleep(500); - } catch (InterruptedException ignore) { - } - }); + treeCache.close(); + try { + Thread.sleep(500); + } catch (InterruptedException ignore) { + } super.close(); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java index f4d72f436e..2203f2ef3c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java @@ -57,11 +57,13 @@ public class ZookeeperOperator implements InitializingBean { public void afterPropertiesSet() throws Exception { this.zkClient = buildClient(); initStateLister(); - //init(); + registerListener(); } - //for subclass - //protected void init(){} + /** + * this method is for sub class, + */ + protected void registerListener(){} public void initStateLister() { checkNotNull(zkClient); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 2aec6ecaf6..168db366c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -101,12 +101,6 @@ public class ZKMasterClient extends AbstractZKClient { // init system znode this.initSystemZNode(); - // monitor master - this.listenerMaster(); - - // monitor worker - this.listenerWorker(); - // register master this.registerMaster(); @@ -158,31 +152,24 @@ public class ZKMasterClient extends AbstractZKClient { } } - /** - * monitor master + * handle path events that this class cares about + * @param client zkClient + * @param event path event + * @param path zk path */ - public void listenerMaster(){ - registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() { - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - switch (event.getType()) { - case NODE_ADDED: - logger.info("master node added : {}", path); - break; - case NODE_REMOVED: - String serverHost = getHostByEventDataPath(path); - if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) { - return; - } - removeZKNodePath(path, ZKNodeType.MASTER, true); - break; - default: - break; - } - } - }); -} + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if(path.equals(getZNodeParentPath(ZKNodeType.MASTER))){ //monitor master + logger.info("master path event touch down"); + handleMasterEvent(event,path); + + }else if(path.equals(getZNodeParentPath(ZKNodeType.WORKER))){ //monitor worker + logger.info("worker path event touch down"); + handleWorkerEvent(event,path); + } + //other path event, ignore + } /** * remove zookeeper node path @@ -273,25 +260,40 @@ public class ZKMasterClient extends AbstractZKClient { } /** - * monitor worker + * monitor master */ - public void listenerWorker(){ - registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - logger.info("worker node deleted : {}", path); - removeZKNodePath(path, ZKNodeType.WORKER, true); - break; - default: - break; + public void handleMasterEvent(TreeCacheEvent event, String path){ + switch (event.getType()) { + case NODE_ADDED: + logger.info("master node added : {}", path); + break; + case NODE_REMOVED: + String serverHost = getHostByEventDataPath(path); + if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) { + return; } - } - }); + removeZKNodePath(path, ZKNodeType.MASTER, true); + break; + default: + break; + } + } + + /** + * monitor worker + */ + public void handleWorkerEvent(TreeCacheEvent event, String path){ + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + logger.info("worker node deleted : {}", path); + removeZKNodePath(path, ZKNodeType.WORKER, true); + break; + default: + break; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java index 0dd1cf15be..192d9e0e58 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java @@ -61,9 +61,6 @@ public class ZKWorkerClient extends AbstractZKClient { // init system znode this.initSystemZNode(); - // monitor worker - this.listenerWorker(); - // register worker this.registWorker(); } @@ -83,31 +80,38 @@ public class ZKWorkerClient extends AbstractZKClient { System.exit(-1); } } - + /** - * monitor worker + * handle path events that this class cares about + * @param client zkClient + * @param event path event + * @param path zk path */ - private void listenerWorker(){ - registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - //find myself dead - String serverHost = getHostByEventDataPath(path); - if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ - return; - } - break; - default: - break; - } - } - }); + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if(path.equals(getZNodeParentPath(ZKNodeType.WORKER))){ + handleWorkerEvent(event,path); + } + } + /** + * monitor worker + */ + public void handleWorkerEvent(TreeCacheEvent event, String path){ + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + //find myself dead + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ + return; + } + break; + default: + break; + } } /**