Browse Source

refactor zk tree cache

pull/2/head
loushang 5 years ago
parent
commit
84df6ed8de
  1. 48
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
  2. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
  3. 94
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  4. 52
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

48
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

@ -16,8 +16,10 @@
*/ */
package org.apache.dolphinscheduler.common.zk; package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,30 +36,37 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
//kay is zk path, value is TreeCache
private ConcurrentHashMap<String, TreeCache> allCaches = new ConcurrentHashMap<>();
TreeCache treeCache;
/** /**
* @param cachePath zk path * register a unified listener of /${dsRoot},
* @param listener operator
*/ */
public void registerListener(final String cachePath, final TreeCacheListener listener) { @Override
TreeCache newCache = new TreeCache(zkClient, cachePath); protected void registerListener() {
logger.info("add listener to zk path: {}", cachePath); treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot());
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try { try {
newCache.start(); treeCache.start();
} catch (Exception e) { } catch (Exception e) {
logger.error("add listener to zk path: {} failed", cachePath); logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e); throw new RuntimeException(e);
} }
newCache.getListenable().addListener(listener); treeCache.getListenable().addListener((client, event) -> {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
});
allCaches.put(cachePath, newCache);
} }
//for sub class
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
public String getFromCache(final String cachePath, final String key) { public String getFromCache(final String cachePath, final String key) {
ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key); ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) { if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
} }
@ -65,18 +74,15 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
} }
public TreeCache getTreeCache(final String cachePath) { public TreeCache getTreeCache(final String cachePath) {
return allCaches.get(checkNotNull(cachePath)); return treeCache;
} }
public void close() { public void close() {
treeCache.close();
allCaches.forEach((path, cache) -> { try {
cache.close(); Thread.sleep(500);
try { } catch (InterruptedException ignore) {
Thread.sleep(500); }
} catch (InterruptedException ignore) {
}
});
super.close(); super.close();
} }
} }

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java

@ -57,11 +57,13 @@ public class ZookeeperOperator implements InitializingBean {
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient(); this.zkClient = buildClient();
initStateLister(); initStateLister();
//init(); registerListener();
} }
//for subclass /**
//protected void init(){} * this method is for sub class,
*/
protected void registerListener(){}
public void initStateLister() { public void initStateLister() {
checkNotNull(zkClient); checkNotNull(zkClient);

94
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -101,12 +101,6 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
// monitor master
this.listenerMaster();
// monitor worker
this.listenerWorker();
// register master // register master
this.registerMaster(); this.registerMaster();
@ -158,31 +152,24 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
/** /**
* monitor master * handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/ */
public void listenerMaster(){ @Override
registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
@Override if(path.equals(getZNodeParentPath(ZKNodeType.MASTER))){ //monitor master
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { logger.info("master path event touch down");
switch (event.getType()) { handleMasterEvent(event,path);
case NODE_ADDED:
logger.info("master node added : {}", path); }else if(path.equals(getZNodeParentPath(ZKNodeType.WORKER))){ //monitor worker
break; logger.info("worker path event touch down");
case NODE_REMOVED: handleWorkerEvent(event,path);
String serverHost = getHostByEventDataPath(path); }
if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) { //other path event, ignore
return; }
}
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:
break;
}
}
});
}
/** /**
* remove zookeeper node path * remove zookeeper node path
@ -273,25 +260,40 @@ public class ZKMasterClient extends AbstractZKClient {
} }
/** /**
* monitor worker * monitor master
*/ */
public void listenerWorker(){ public void handleMasterEvent(TreeCacheEvent event, String path){
registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { switch (event.getType()) {
@Override case NODE_ADDED:
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { logger.info("master node added : {}", path);
switch (event.getType()) { break;
case NODE_ADDED: case NODE_REMOVED:
logger.info("worker node added : {}", path); String serverHost = getHostByEventDataPath(path);
break; if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
case NODE_REMOVED: return;
logger.info("worker node deleted : {}", path);
removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
} }
} removeZKNodePath(path, ZKNodeType.MASTER, true);
}); break;
default:
break;
}
}
/**
* monitor worker
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
logger.info("worker node deleted : {}", path);
removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
}
} }

52
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

@ -61,9 +61,6 @@ public class ZKWorkerClient extends AbstractZKClient {
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
// monitor worker
this.listenerWorker();
// register worker // register worker
this.registWorker(); this.registWorker();
} }
@ -85,29 +82,36 @@ public class ZKWorkerClient extends AbstractZKClient {
} }
/** /**
* monitor worker * handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/ */
private void listenerWorker(){ @Override
registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
@Override if(path.equals(getZNodeParentPath(ZKNodeType.WORKER))){
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { handleWorkerEvent(event,path);
switch (event.getType()) { }
case NODE_ADDED: }
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
}
});
/**
* monitor worker
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
} }
/** /**

Loading…
Cancel
Save