Browse Source

[feature] #1813 remove "_001" after the master/server register path in zookeeper (#1820)

* change master/worker register path.

* remove "_" from register path.
pull/2/head
bao liang 5 years ago committed by lgcareer
parent
commit
0b272f9ca6
  1. 55
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
  2. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

55
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java

@ -115,21 +115,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
} }
} }
/** /**
* create zookeeper path according the zk node type. * create zookeeper path according the zk node type.
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
* @return register zookeeper path * @return register zookeeper path
* @throws Exception * @throws Exception
*/ */
private String createZNodePath(ZKNodeType zkNodeType) throws Exception { private String createZNodePath(ZKNodeType zkNodeType, String host) throws Exception {
// specify the format of stored data in ZK nodes // specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date()); String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
// create temporary sequence nodes for master znode // create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType); String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host;
String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
String registerPath = serverPathPrefix + UNDERLINE; super.persistEphemeral(registerPath, heartbeatZKInfo);
super.persistEphemeral(registerPath, heartbeatZKInfo);
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath; return registerPath;
} }
@ -148,7 +146,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
zkNodeType.toString(), host); zkNodeType.toString(), host);
return registerPath; return registerPath;
} }
registerPath = createZNodePath(zkNodeType); registerPath = createZNodePath(zkNodeType, host);
// handle dead server // handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
@ -166,25 +164,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
* @throws Exception errors * @throws Exception errors
*/ */
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
//ip_sequenceno String host = getHostByEventDataPath(zNode);
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete //check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){ if(opType.equals(DELETE_ZK_OP)){
String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); removeDeadServerByHost(host, type);
String ip = ipAndSeqNo[0];
removeDeadServerByHost(ip, type);
}else if(opType.equals(ADD_ZK_OP)){ }else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){ if(!super.isExisted(deadServerPath)){
//add dead server info to zk dead server path : /dead-servers/ //add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo)); super.persist(deadServerPath,(type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success" , logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode); zkNodeType.toString(), zNode);
@ -285,21 +277,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
} }
Map<String, String> serverMaps = getServerMaps(zkNodeType); Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){ for(String hostKey : serverMaps.keySet()){
if(hostKey.startsWith(host + UNDERLINE)){ if(hostKey.startsWith(host)){
return true; return true;
} }
} }
return false; return false;
} }
/**
* get zkclient
* @return zookeeper client
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/** /**
* *
* @return get worker node parent path * @return get worker node parent path
@ -436,19 +420,22 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
} }
/** /**
* get host ip, string format: masterParentPath/ip_000001/value * get host ip, string format: masterParentPath/ip
* @param path path * @param path path
* @return host ip, string format: masterParentPath/ip_000001/value * @return host ip, string format: masterParentPath/ip
*/ */
protected String getHostByEventDataPath(String path) { protected String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1; if(StringUtils.isEmpty(path)){
int endIndex = path.lastIndexOf("_"); logger.error("empty path!");
return "";
if(startIndex >= endIndex){ }
logger.error("parse ip error"); String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){
logger.error("parse ip error: {}", path);
return ""; return "";
} }
return path.substring(startIndex, endIndex); return pathArray[pathArray.length - 1];
} }
/** /**
* acquire zk lock * acquire zk lock

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

@ -77,6 +77,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return treeCache; return treeCache;
} }
@Override
public void close() { public void close() {
treeCache.close(); treeCache.close();
try { try {

Loading…
Cancel
Save