From 47f90e25aa21b573612e3b822a2eb52898f9ebd6 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 11:41:22 +0800 Subject: [PATCH] refactor zkMasterClient/zkWorkerClient --- .../api/service/MonitorService.java | 12 +- .../api/service/SchedulerService.java | 1 + .../escheduler/api/service/ServerService.java | 2 +- .../api/utils/ZookeeperMonitor.java | 26 +- .../api/utils/ZookeeperMonitorUtilsTest.java | 2 +- .../common}/model/MasterServer.java | 2 +- .../cn/escheduler/common/utils}/ResInfo.java | 7 +- .../common/zk/AbstractZKClient.java | 102 ++++++- .../java/cn/escheduler/dao/ServerDao.java | 2 +- .../dao/mapper/MasterServerMapper.java | 2 +- .../dao/mapper/MasterServerMapperTest.java | 2 +- .../master/runner/MasterSchedulerThread.java | 14 +- .../server/worker/runner/FetchTaskThread.java | 13 +- .../escheduler/server/zk/ZKMasterClient.java | 256 ++++++------------ .../escheduler/server/zk/ZKWorkerClient.java | 60 +--- 15 files changed, 216 insertions(+), 287 deletions(-) rename {escheduler-dao/src/main/java/cn/escheduler/dao => escheduler-common/src/main/java/cn/escheduler/common}/model/MasterServer.java (98%) rename {escheduler-server/src/main/java/cn/escheduler/server => escheduler-common/src/main/java/cn/escheduler/common/utils}/ResInfo.java (95%) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java index 08e8bf576e..33d69859d2 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java @@ -20,14 +20,12 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.ZookeeperMonitor; import cn.escheduler.dao.MonitorDBDao; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.MonitorRecord; import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.ZookeeperRecord; -import org.apache.hadoop.mapred.Master; import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,7 +63,9 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - List masterServers = new ZookeeperMonitor().getMasterServers(); + ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); + List masterServers = zookeeperMonitor.getMasterServers(); + zookeeperMonitor.close(); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,8 +99,10 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); + ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); + List workerServers = zookeeperMonitor.getWorkerServers(); + zookeeperMonitor.close(); - List workerServers = new ZookeeperMonitor().getWorkerServers(); result.put(Constants.DATA_LIST, workerServers); putMsg(result,Status.SUCCESS); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java index d4515c79f3..64fc8c76eb 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java @@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy; import cn.escheduler.common.enums.Priority; import cn.escheduler.common.enums.ReleaseState; import cn.escheduler.common.enums.WarningType; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.ProcessDao; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java index 6957075dda..a54b385b0d 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java @@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.WorkerServer; import org.springframework.beans.factory.annotation.Autowired; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java index 0f44b5f7db..ddbb6c9c1a 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java @@ -1,9 +1,9 @@ package cn.escheduler.api.utils; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.zk.AbstractZKClient; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.ZookeeperRecord; -import cn.escheduler.server.ResInfo; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; /** @@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{ return null; } - /** - * get server list. - * @param isMaster - * @return - */ - public List getServers(boolean isMaster){ - List masterServers = new ArrayList<>(); - Map masterMap = getServerList(isMaster); - String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); - for(String path : masterMap.keySet()){ - MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path)); - masterServer.setZkDirectory( parentPath + "/"+ path); - masterServers.add(masterServer); - } - return masterServers; - } - /** * get master servers * @return */ public List getMasterServers(){ - return getServers(true); + return getServers(ZKNodeType.MASTER); } /** @@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getWorkerServers(){ - return getServers(false); + return getServers(ZKNodeType.WORKER); } private static List zookeeperInfoList(String zookeeperServers) { diff --git a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java index 87a26ba449..4feb06ad5c 100644 --- a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java +++ b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java @@ -1,6 +1,6 @@ package cn.escheduler.api.utils; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.junit.Assert; import org.junit.Test; diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java similarity index 98% rename from escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java rename to escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java index 1ccb15b5cc..bb2f38cb14 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cn.escheduler.dao.model; +package cn.escheduler.common.model; import java.util.Date; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java similarity index 95% rename from escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java rename to escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java index 844c7be8b0..6a48d6bc89 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java @@ -14,13 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cn.escheduler.server; +package cn.escheduler.common.utils; import cn.escheduler.common.Constants; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.JSONUtils; -import cn.escheduler.common.utils.OSUtils; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import java.util.Date; diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 3c58996298..79f26c0e82 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -18,19 +18,24 @@ package cn.escheduler.common.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; +import cn.escheduler.common.enums.ZKNodeType; +import cn.escheduler.common.model.MasterServer; +import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.utils.ResInfo; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,7 +222,7 @@ public abstract class AbstractZKClient { workerZNodeParentPath = getWorkerZNodeParentPath(); // read server node parent path from conf - deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); + deadServerZNodeParentPath = getDeadZNodeParentPath(); if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){ // create persistent dead server parent node @@ -243,6 +248,7 @@ public abstract class AbstractZKClient { } + public void removeDeadServerByHost(String host, String serverType) throws Exception { List deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); for(String serverPath : deadServers){ @@ -341,16 +347,34 @@ public abstract class AbstractZKClient { return sb.toString(); } + /** + * get server list. + * @param zkNodeType + * @return + */ + public List getServers(ZKNodeType zkNodeType){ + Map masterMap = getServerList(zkNodeType); + String parentPath = getZNodeParentPath(zkNodeType); + + List masterServers = new ArrayList<>(); + for(String path : masterMap.keySet()){ + MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path)); + masterServer.setZkDirectory( parentPath + "/"+ path); + masterServers.add(masterServer); + } + return masterServers; + } + /** * get master server list map. * result : {host : resource info} * @return */ - public Map getServerList(boolean isMaster ){ + public Map getServerList(ZKNodeType zkNodeType){ Map masterMap = new HashMap<>(); try { - String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); + String path = getZNodeParentPath(zkNodeType); List serverList = getZkClient().getChildren().forPath(path); for(String server : serverList){ byte[] bytes = getZkClient().getData().forPath(path + "/" + server); @@ -363,6 +387,28 @@ public abstract class AbstractZKClient { return masterMap; } + /** + * check the zookeeper node already exists + * @param host + * @param zkNodeType + * @return + * @throws Exception + */ + public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { + String path = getZNodeParentPath(zkNodeType); + if(StringUtils.isEmpty(path)){ + logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); + return false; + } + Map serverMaps = getServerList(zkNodeType); + for(String hostKey : serverMaps.keySet()){ + if(hostKey.startsWith(host)){ + return true; + } + } + return false; + } + /** * get zkclient * @return @@ -391,6 +437,34 @@ public abstract class AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); } + /** + * get zookeeper node parent path + * @param zkNodeType + * @return + */ + public String getZNodeParentPath(ZKNodeType zkNodeType) { + String path = ""; + switch (zkNodeType){ + case MASTER: + return getMasterZNodeParentPath(); + case WORKER: + return getWorkerZNodeParentPath(); + case DEAD_SERVER: + return getDeadZNodeParentPath(); + default: + break; + } + return path; + } + + /** + * get dead server node parent path + * @return + */ + protected String getDeadZNodeParentPath(){ + return conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); + } + /** * get master start up lock path * @return @@ -415,6 +489,26 @@ public abstract class AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS); } + /** + * release mutex + * @param mutex + */ + public static void releaseMutex(InterProcessMutex mutex) { + if (mutex != null){ + try { + mutex.release(); + } catch (Exception e) { + if(e.getMessage().equals("instance must be started before calling this method")){ + logger.warn("lock release"); + }else{ + logger.error("lock release failed : " + e.getMessage(),e); + } + + } + } + } + + @Override public String toString() { diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java index 82c71e2855..36823d8b25 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java @@ -18,7 +18,7 @@ package cn.escheduler.dao; import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.WorkerServer; import org.springframework.beans.factory.annotation.Autowired; diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java index 4fec8092de..8fbfb5298e 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java @@ -16,7 +16,7 @@ */ package cn.escheduler.dao.mapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.apache.ibatis.annotations.*; import org.apache.ibatis.type.JdbcType; diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java index 9f66069eeb..f74683d149 100644 --- a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java @@ -17,7 +17,7 @@ package cn.escheduler.dao.mapper; import cn.escheduler.dao.datasource.ConnectionFactory; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java index b845e19ae0..8562337554 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java @@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.server.zk.ZKMasterClient; @@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable { }catch (Exception e){ logger.error("master scheduler thread exception : " + e.getMessage(),e); }finally{ - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed : " + e.getMessage(),e); - } - - } - } + AbstractZKClient.releaseMutex(mutex); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 5495096161..8f23895547 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -22,6 +22,7 @@ import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.*; import cn.escheduler.server.zk.ZKWorkerClient; @@ -235,17 +236,7 @@ public class FetchTaskThread implements Runnable{ }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); }finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("fetch task lock release"); - }else{ - logger.error("fetch task lock release failed : " + e.getMessage(),e); - } - } - } + AbstractZKClient.releaseMutex(mutex); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 3596155dd3..50ff76a024 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -19,8 +19,8 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ZKNodeType; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.AlertDao; @@ -30,7 +30,7 @@ import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.WorkerServer; -import cn.escheduler.server.ResInfo; +import cn.escheduler.common.utils.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -118,7 +118,6 @@ public class ZKMasterClient extends AbstractZKClient { try { // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(zkClient, znodeLock); mutex.acquire(); @@ -137,29 +136,19 @@ public class ZKMasterClient extends AbstractZKClient { // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { failoverWorker(null, true); -// processDao.masterStartupFaultTolerant(); failoverMaster(null); } }catch (Exception e){ logger.error("master start up exception : " + e.getMessage(),e); }finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed : " + e.getMessage(),e); - } - - } - } + releaseMutex(mutex); } } + + /** * init dao */ @@ -202,75 +191,25 @@ public class ZKMasterClient extends AbstractZKClient { // exit system System.exit(-1); } - - // specify the format of stored data in ZK nodes - String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); - // create temporary sequence nodes for master znode - masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( - masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); - + createMasterZNode(now); logger.info("register master node {} success" , masterZNode); // handle dead server handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP); - - // delete master server from database - serverDao.deleteMaster(OSUtils.getHost()); - - // register master znode - serverDao.registerMaster(OSUtils.getHost(), - OSUtils.getProcessID(), - masterZNode, - ResInfo.getResInfoJson(), - createTime, - createTime); - } catch (Exception e) { logger.error("register master failure : " + e.getMessage(),e); } } - - /** - * check the zookeeper node already exists - * @param host - * @param zkNodeType - * @return - * @throws Exception - */ - private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { - - String path = null; - switch (zkNodeType){ - case MASTER: - path = masterZNodeParentPath; - break; - case WORKER: - path = workerZNodeParentPath; - break; - case DEAD_SERVER: - path = deadServerZNodeParentPath; - break; - default: - break; - } - if(StringUtils.isEmpty(path)){ - logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); - return false; - } - - List serverList = null; - serverList = zkClient.getChildren().forPath(path); - if (CollectionUtils.isNotEmpty(serverList)){ - for (String masterZNode : serverList){ - if (masterZNode.startsWith(host)){ - return true; - } - } - } - return false; + private void createMasterZNode(Date now) throws Exception { + // specify the format of stored data in ZK nodes + String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); + // create temporary sequence nodes for master znode + masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( + masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); } + /** * monitor master */ @@ -291,59 +230,9 @@ public class ZKMasterClient extends AbstractZKClient { case CHILD_REMOVED: String path = event.getData().getPath(); logger.info("master node deleted : {}",event.getData().getPath()); - - InterProcessMutex mutexLock = null; - try { - // handle dead server, add to zk dead server pth - handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); - - if(masterZNode.equals(path)){ - logger.error("master server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); - break; - } - - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master - String znodeLock = zkMasterClient.getMasterFailoverLockPath(); - mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutexLock.acquire(); - - String masterHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); - } - if(StringUtils.isNotEmpty(masterHost)){ - failoverMaster(masterHost); - } - }catch (Exception e){ - logger.error("master failover failed : " + e.getMessage(),e); - }finally { - if (mutexLock != null){ - try { - mutexLock.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } - } + removeMasterNode(path); break; case CHILD_UPDATED: - if (event.getData().getPath().contains(OSUtils.getHost())){ - byte[] bytes = zkClient.getData().forPath(event.getData().getPath()); - String resInfoStr = new String(bytes); - String[] splits = resInfoStr.split(Constants.COMMA); - if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { - return; - } - // updateProcessInstance Master information in database according to host - serverDao.updateMaster(OSUtils.getHost(), - OSUtils.getProcessID(), - ResInfo.getResInfoJson(Double.parseDouble(splits[2]), - Double.parseDouble(splits[3])), - DateUtils.stringToDate(splits[5])); - - logger.debug("master zk node updated : {}",event.getData().getPath()); - } break; default: break; @@ -356,6 +245,42 @@ public class ZKMasterClient extends AbstractZKClient { } + private void removeMasterNode(String path) { + InterProcessMutex mutexLock = null; + try { + // handle dead server, add to zk dead server pth + handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); + + if(masterZNode.equals(path)){ + logger.error("master server({}) of myself dead , stopping...", path); + stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); + return; + } + + // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master + String znodeLock = zkMasterClient.getMasterFailoverLockPath(); + mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutexLock.acquire(); + + String masterHost = getHostByEventDataPath(path); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { + alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); + } + if(StringUtils.isNotEmpty(masterHost)){ + failoverMaster(masterHost); + } + }catch (Exception e){ + logger.error("master failover failed : " + e.getMessage(),e); + }finally { + if (mutexLock != null){ + try { + mutexLock.release(); + } catch (Exception e) { + logger.error("lock relase failed : " + e.getMessage(),e); + } + } + } + } /** @@ -377,40 +302,8 @@ public class ZKMasterClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - logger.info("node deleted : {}",event.getData().getPath()); - - InterProcessMutex mutex = null; - try { - - // handle dead server - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker - String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutex.acquire(); - - String workerHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); - } - - if(StringUtils.isNotEmpty(workerHost)){ - failoverWorker(workerHost, true); - } - }catch (Exception e){ - logger.error("worker failover failed : " + e.getMessage(),e); - } - finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } - } + removeZKNodePath(path); break; default: break; @@ -420,7 +313,34 @@ public class ZKMasterClient extends AbstractZKClient { }catch (Exception e){ logger.error("listener worker failed : " + e.getMessage(),e); } + } + + private void removeZKNodePath(String path) { + InterProcessMutex mutex = null; + try { + + // handle dead server + handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); + + // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker + String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); + mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutex.acquire(); + + String workerHost = getHostByEventDataPath(path); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { + alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); + } + if(StringUtils.isNotEmpty(workerHost)){ + failoverWorker(workerHost, true); + } + }catch (Exception e){ + logger.error("worker failover failed : " + e.getMessage(),e); + } + finally { + releaseMutex(mutex); + } } /** @@ -431,9 +351,6 @@ public class ZKMasterClient extends AbstractZKClient { return masterZNode; } - - - /** * task needs failover if task start before worker starts * @@ -460,15 +377,20 @@ public class ZKMasterClient extends AbstractZKClient { * @return */ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + if(StringUtils.isEmpty(taskInstance.getHost())){ + return false; + } Date workerServerStartDate = null; - List workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost()); - if(workerServers.size() > 0){ - workerServerStartDate = workerServers.get(0).getCreateTime(); + List workerServers= getServers(ZKNodeType.WORKER); + for(MasterServer server : workerServers){ + if(server.getHost().equals(taskInstance.getHost())){ + workerServerStartDate = server.getCreateTime(); + break; + } } if(workerServerStartDate != null){ return taskInstance.getStartTime().after(workerServerStartDate); - }else{ return false; } @@ -478,6 +400,7 @@ public class ZKMasterClient extends AbstractZKClient { * failover worker tasks * 1. kill yarn job if there are yarn jobs in tasks. * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null * @param workerHost */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { @@ -501,9 +424,6 @@ public class ZKMasterClient extends AbstractZKClient { taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processDao.saveTaskInstance(taskInstance); } - - //update task Instance state value is NEED_FAULT_TOLERANCE - // processDao.updateNeedFailoverTaskInstances(workerHost); logger.info("end worker[{}] failover ...", workerHost); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java index e00d72da24..2090ec1bdd 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java @@ -17,13 +17,13 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ServerDao; -import cn.escheduler.server.ResInfo; +import cn.escheduler.common.utils.ResInfo; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -130,50 +130,19 @@ public class ZKWorkerClient extends AbstractZKClient { * register worker */ private void registWorker(){ - // get current date Date now = new Date(); createTime = now ; try { - - // encapsulation worker znnode - workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_"; - List workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath); - - if (CollectionUtils.isNotEmpty(workerZNodeList)){ - boolean flag = false; - for (String workerZNode : workerZNodeList){ - if (workerZNode.startsWith(OSUtils.getHost())){ - flag = true; - break; - } - } - - if (flag){ - logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); - // exit system - System.exit(-1); - } + if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){ + logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); + System.exit(-1); } -// String heartbeatZKInfo = getOsInfo(now); -// workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode, -// heartbeatZKInfo.getBytes()); - + // create worker zknode initWorkZNode(); // handle dead server handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP); - - // delete worker server from database - serverDao.deleteWorker(OSUtils.getHost()); - - // register worker znode - serverDao.registerWorker(OSUtils.getHost(), - OSUtils.getProcessID(), - workerZNode, - ResInfo.getResInfoJson(), - createTime, - createTime); } catch (Exception e) { logger.error("register worker failure : " + e.getMessage(),e); } @@ -198,7 +167,6 @@ public class ZKWorkerClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - // handle dead server, add to zk dead server path handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); @@ -211,22 +179,6 @@ public class ZKWorkerClient extends AbstractZKClient { logger.info("node deleted : {}", event.getData().getPath()); break; case CHILD_UPDATED: - if (event.getData().getPath().contains(OSUtils.getHost())){ - byte[] bytes = zkClient.getData().forPath(event.getData().getPath()); - String resInfoStr = new String(bytes); - String[] splits = resInfoStr.split(Constants.COMMA); - if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { - return; - } - - // updateProcessInstance master info in database according to host - serverDao.updateWorker(OSUtils.getHost(), - OSUtils.getProcessID(), - ResInfo.getResInfoJson(Double.parseDouble(splits[2]) - ,Double.parseDouble(splits[3])), - DateUtils.stringToDate(splits[5])); - logger.debug("node updated : {}",event.getData().getPath()); - } break; default: break;