From e4210d14514effc71b7e2b0737ed82781bea33f4 Mon Sep 17 00:00:00 2001 From: lenboo Date: Mon, 29 Jul 2019 18:52:10 +0800 Subject: [PATCH] refactor zk client. --- .../api/service/MonitorService.java | 8 +- .../api/utils/ZookeeperMonitor.java | 4 +- .../common/zk/AbstractZKClient.java | 94 ++++++++- .../server/master/MasterServer.java | 19 +- .../escheduler/server/zk/ZKMasterClient.java | 199 +++++++----------- .../escheduler/server/zk/ZKWorkerClient.java | 29 +-- 6 files changed, 177 insertions(+), 176 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java index e754fd10e0..cc6a847578 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java @@ -65,7 +65,7 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - List masterServers = getServerList(true); + List masterServers = getServerListFromZK(true); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,7 +99,7 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - List workerServers = getServerList(false); + List workerServers = getServerListFromZK(false); result.put(Constants.DATA_LIST, workerServers); putMsg(result,Status.SUCCESS); @@ -107,13 +107,13 @@ public class MonitorService extends BaseService{ return result; } - private List getServerList(boolean isMaster){ + private List getServerListFromZK(boolean isMaster){ List servers = new ArrayList<>(); ZookeeperMonitor zookeeperMonitor = null; try{ zookeeperMonitor = new ZookeeperMonitor(); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; - servers = zookeeperMonitor.getServers(zkNodeType); + servers = zookeeperMonitor.getServersList(zkNodeType); }catch (Exception e){ throw e; }finally { diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java index ddbb6c9c1a..dc6f95d7ec 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java @@ -40,7 +40,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getMasterServers(){ - return getServers(ZKNodeType.MASTER); + return getServersList(ZKNodeType.MASTER); } /** @@ -48,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getWorkerServers(){ - return getServers(ZKNodeType.WORKER); + return getServersList(ZKNodeType.WORKER); } private static List zookeeperInfoList(String zookeeperServers) { diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 5d3586a301..46cb7a980e 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -20,7 +20,6 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.model.MasterServer; -import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.ResInfo; @@ -223,28 +222,68 @@ public abstract class AbstractZKClient { } } + + /** + * create zookeeper path according the zk node type. + * @param zkNodeType + * @return + * @throws Exception + */ + private String createZNodePath(ZKNodeType zkNodeType) throws Exception { + // specify the format of stored data in ZK nodes + String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date()); + // create temporary sequence nodes for master znode + String parentPath = getZNodeParentPath(zkNodeType); + String serverPathPrefix = parentPath + "/" + OSUtils.getHost(); + String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( + serverPathPrefix + "_", heartbeatZKInfo.getBytes()); + logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); + return registerPath; + } + + /** + * register server, if server already exists, return null. + * @param zkNodeType + * @return register server path in zookeeper + */ + public String registerServer(ZKNodeType zkNodeType) throws Exception { + String registerPath = null; + String host = OSUtils.getHost(); + if(checkZKNodeExists(host, zkNodeType)){ + logger.error("register failure , {} server already started on host : {}" , + zkNodeType.toString(), host); + return registerPath; + } + registerPath = createZNodePath(ZKNodeType.MASTER); + + // handle dead server + handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); + + return registerPath; + } + /** * opType(add): if find dead server , then add to zk deadServerPath * opType(delete): delete path from zk * * @param zNode node path - * @param serverType master or worker prefix + * @param zkNodeType master or worker * @param opType delete or add * @throws Exception */ - public void handleDeadServer(String zNode, String serverType, String opType) throws Exception { + public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { //ip_sequenceno String[] zNodesPath = zNode.split("\\/"); String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; //check server restart, if restart , dead server path in zk should be delete if(opType.equals(DELETE_ZK_OP)){ String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); String ip = ipAndSeqNo[0]; - removeDeadServerByHost(ip, serverType); + removeDeadServerByHost(ip, type); }else if(opType.equals(ADD_ZK_OP)){ String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; @@ -253,7 +292,8 @@ public abstract class AbstractZKClient { zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); - logger.info("{} server dead , and {} added to zk dead server path success" , serverType, zNode); + logger.info("{} server dead , and {} added to zk dead server path success" , + zkNodeType.toString(), zNode); } } @@ -314,8 +354,8 @@ public abstract class AbstractZKClient { * @param zkNodeType * @return */ - public List getServers(ZKNodeType zkNodeType){ - Map masterMap = getServerList(zkNodeType); + public List getServersList(ZKNodeType zkNodeType){ + Map masterMap = getServerMaps(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType); List masterServers = new ArrayList<>(); @@ -332,7 +372,7 @@ public abstract class AbstractZKClient { * result : {host : resource info} * @return */ - public Map getServerList(ZKNodeType zkNodeType){ + public Map getServerMaps(ZKNodeType zkNodeType){ Map masterMap = new HashMap<>(); try { @@ -363,7 +403,7 @@ public abstract class AbstractZKClient { host, zkNodeType.toString()); return false; } - Map serverMaps = getServerList(zkNodeType); + Map serverMaps = getServerMaps(zkNodeType); for(String hostKey : serverMaps.keySet()){ if(hostKey.startsWith(host)){ return true; @@ -497,7 +537,39 @@ public abstract class AbstractZKClient { } } - @Override + /** + * server self dead, stop all threads + * @param serverHost + * @param zkNodeType + */ + protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) { + if (serverHost.equals(OSUtils.getHost())) { + logger.error("{} server({}) of myself dead , stopping...", + zkNodeType.toString(), serverHost); + stoppable.stop(String.format(" {} server {} of myself dead , stopping...", + zkNodeType.toString(), serverHost)); + return true; + } + return false; + } + + /** + * get host ip, string format: masterParentPath/ip_000001/value + * @param path + * @return + */ + protected String getHostByEventDataPath(String path) { + int startIndex = path.lastIndexOf("/")+1; + int endIndex = path.lastIndexOf("_"); + + if(startIndex >= endIndex){ + logger.error("parse ip error"); + return ""; + } + return path.substring(startIndex, endIndex); + } + + @Override public String toString() { return "AbstractZKClient{" + "zkClient=" + zkClient + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index bf0dcbfe75..562b6509e5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { public MasterServer(ProcessDao processDao){ zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); - this.serverDao = zkMasterClient.getServerDao(); - this.alertDao = zkMasterClient.getAlertDao(); } public void run(ProcessDao processDao){ @@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL, Constants.defaultMasterHeartbeatInterval); + // master exec thread pool num + int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS, + Constants.defaultMasterExecThreadNum); + + heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); // heartbeat thread implement @@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { heartbeatMasterService. scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); - // master exec thread pool num - int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS, - Constants.defaultMasterExecThreadNum); - // master scheduler thread MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( zkMasterClient, @@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable { masterSchedulerService.execute(masterSchedulerThread); // start QuartzExecutors + // TODO... + // what system should do if exception try { ProcessScheduleJob.init(processDao); QuartzExecutors.getInstance().start(); @@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - String host = OSUtils.getHost(); - // clear master table register info - serverDao.deleteMaster(host); logger.info("master server stopped"); if (zkMasterClient.getActiveMasterNum() <= 1) { for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, host, "Master-Server"); + zkMasterClient.getAlertDao().sendServerStopedAlert( + 1, OSUtils.getHost(), "Master-Server"); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 2e77bb6fc6..c972580e31 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -20,8 +20,6 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.model.MasterServer; -import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; @@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; -import cn.escheduler.dao.model.WorkerServer; -import cn.escheduler.common.utils.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.utils.ThreadUtils; -import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient { * master database access */ private ServerDao serverDao = null; + /** * alert database access */ @@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient { */ private ProcessDao processDao; - - private Date createTime = null; - /** * zkMasterClient */ @@ -131,7 +124,7 @@ public class ZKMasterClient extends AbstractZKClient { this.listenerWorker(); // register master - this.registMaster(); + this.registerMaster(); // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { @@ -157,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient { this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } - - /** - * get maste dao - * @return - */ - public ServerDao getServerDao(){ - return serverDao; - } - /** * get alert dao * @return @@ -174,40 +158,24 @@ public class ZKMasterClient extends AbstractZKClient { return alertDao; } + + + /** * register master znode */ - public void registMaster(){ - - // get current date - Date now = new Date(); - createTime = now ; + public void registerMaster(){ try { - String osHost = OSUtils.getHost(); - - // zookeeper node exists, cannot start a new one. - if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){ - logger.error("register failure , master already started on host : {}" , osHost); - // exit system - System.exit(-1); + String serverPath = registerServer(ZKNodeType.MASTER); + if(StringUtils.isEmpty(serverPath)){ + System.exit(-1); } - createMasterZNode(now); - logger.info("register master node {} success" , masterZNode); - - // handle dead server - handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP); } catch (Exception e) { logger.error("register master failure : " + e.getMessage(),e); + System.exit(-1); } } - private void createMasterZNode(Date now) throws Exception { - // specify the format of stored data in ZK nodes - String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); - // create temporary sequence nodes for master znode - masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( - masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); - } /** @@ -217,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient { PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory); try { - Date now = new Date(); - createTime = now ; masterPc.start(); masterPc.getListenable().addListener(new PathChildrenCacheListener() { @Override @@ -229,8 +195,11 @@ public class ZKMasterClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - logger.info("master node deleted : {}",event.getData().getPath()); - removeMasterNode(path); + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){ + return; + } + removeZKNodePath(path, ZKNodeType.MASTER, true); break; case CHILD_UPDATED: break; @@ -242,46 +211,69 @@ public class ZKMasterClient extends AbstractZKClient { }catch (Exception e){ logger.error("monitor master failed : " + e.getMessage(),e); } +} - } - - private void removeMasterNode(String path) { - InterProcessMutex mutexLock = null; + private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { + logger.info("{} node deleted : {}", zkNodeType.toString(), path); + InterProcessMutex mutex = null; try { - // handle dead server, add to zk dead server pth - handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); + String failoverPath = getFailoverLockPath(zkNodeType); + // create a distributed lock + mutex = new InterProcessMutex(getZkClient(), failoverPath); + mutex.acquire(); - if(masterZNode.equals(path)){ - logger.error("master server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); - return; + String serverHost = getHostByEventDataPath(path); + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + //alert server down. + alertServerDown(serverHost, zkNodeType); + //failover server + if(failover){ + failoverServerWhenDown(serverHost, zkNodeType); } + }catch (Exception e){ + logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("failover exception : " + e.getMessage(),e); + } + finally { + releaseMutex(mutex); + } + } - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master - String znodeLock = zkMasterClient.getMasterFailoverLockPath(); - mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutexLock.acquire(); + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + if(StringUtils.isEmpty(serverHost)){ + return ; + } + switch (zkNodeType){ + case MASTER: + failoverMaster(serverHost); + break; + case WORKER: + failoverWorker(serverHost, true); + default: + break; + } + } - String masterHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); - } - if(StringUtils.isNotEmpty(masterHost)){ - failoverMaster(masterHost); - } - }catch (Exception e){ - logger.error("master failover failed : " + e.getMessage(),e); - }finally { - if (mutexLock != null){ - try { - mutexLock.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } + private String getFailoverLockPath(ZKNodeType zkNodeType){ + + switch (zkNodeType){ + case MASTER: + return getMasterFailoverLockPath(); + case WORKER: + return getWorkerFailoverLockPath(); + default: + return ""; } } + private void alertServerDown(String serverHost, ZKNodeType zkNodeType) { + + String serverType = zkNodeType.toString(); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) { + alertDao.sendServerStopedAlert(1, serverHost, serverType); + } + } /** * monitor worker @@ -290,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient { PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory); try { - Date now = new Date(); - createTime = now ; workerPc.start(); workerPc.getListenable().addListener(new PathChildrenCacheListener() { @Override @@ -303,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient { case CHILD_REMOVED: String path = event.getData().getPath(); logger.info("node deleted : {}",event.getData().getPath()); - removeZKNodePath(path); + removeZKNodePath(path, ZKNodeType.WORKER, true); break; default: break; @@ -315,33 +305,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - private void removeZKNodePath(String path) { - InterProcessMutex mutex = null; - try { - - // handle dead server - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - - // create a distributed lock - String znodeLock = getWorkerFailoverLockPath(); - mutex = new InterProcessMutex(getZkClient(), znodeLock); - mutex.acquire(); - - String workerHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); - } - - if(StringUtils.isNotEmpty(workerHost)){ - failoverWorker(workerHost, true); - } - }catch (Exception e){ - logger.error("worker failover failed : " + e.getMessage(),e); - } - finally { - releaseMutex(mutex); - } - } /** * get master znode @@ -381,7 +344,7 @@ public class ZKMasterClient extends AbstractZKClient { return false; } Date workerServerStartDate = null; - List workerServers= getServers(ZKNodeType.WORKER); + List workerServers= getServersList(ZKNodeType.WORKER); for(MasterServer server : workerServers){ if(server.getHost().equals(taskInstance.getHost())){ workerServerStartDate = server.getCreateTime(); @@ -444,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("master failover end"); } - /** - * get host ip, string format: masterParentPath/ip_000001/value - * @param path - * @return - */ - private String getHostByEventDataPath(String path) { - int startIndex = path.lastIndexOf("/")+1; - int endIndex = path.lastIndexOf("_"); - - if(startIndex >= endIndex){ - logger.error("parse ip error"); - return ""; - } - return path.substring(startIndex, endIndex); - } - - - - - } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java index 2090ec1bdd..0c35728f84 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java @@ -18,12 +18,12 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ZKNodeType; -import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ServerDao; import cn.escheduler.common.utils.ResInfo; +import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; -import java.util.List; import java.util.concurrent.ThreadFactory; @@ -130,21 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient { * register worker */ private void registWorker(){ - // get current date - Date now = new Date(); - createTime = now ; try { - if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){ - logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); + String serverPath = registerServer(ZKNodeType.WORKER); + if(StringUtils.isEmpty(serverPath)){ System.exit(-1); } - - // create worker zknode - initWorkZNode(); - // handle dead server - handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP); } catch (Exception e) { logger.error("register worker failure : " + e.getMessage(),e); + System.exit(-1); } } @@ -167,16 +159,11 @@ public class ZKWorkerClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - // handle dead server, add to zk dead server path - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - //find myself dead - if(workerZNode.equals(path)){ - - logger.warn(" worker server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("worker server(%s) of myself dead , stopping",path)); - } - logger.info("node deleted : {}", event.getData().getPath()); + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ + return; + } break; case CHILD_UPDATED: break;