Browse Source

refactor zk client.

pull/2/head
lenboo 5 years ago
parent
commit
e4210d1451
  1. 8
      escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
  2. 4
      escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
  3. 92
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  4. 19
      escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
  5. 193
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  6. 27
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

8
escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java

@ -65,7 +65,7 @@ public class MonitorService extends BaseService{
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<MasterServer> masterServers = getServerList(true); List<MasterServer> masterServers = getServerListFromZK(true);
result.put(Constants.DATA_LIST, masterServers); result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);
@ -99,7 +99,7 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryWorker(User loginUser) { public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<MasterServer> workerServers = getServerList(false); List<MasterServer> workerServers = getServerListFromZK(false);
result.put(Constants.DATA_LIST, workerServers); result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);
@ -107,13 +107,13 @@ public class MonitorService extends BaseService{
return result; return result;
} }
private List<MasterServer> getServerList(boolean isMaster){ private List<MasterServer> getServerListFromZK(boolean isMaster){
List<MasterServer> servers = new ArrayList<>(); List<MasterServer> servers = new ArrayList<>();
ZookeeperMonitor zookeeperMonitor = null; ZookeeperMonitor zookeeperMonitor = null;
try{ try{
zookeeperMonitor = new ZookeeperMonitor(); zookeeperMonitor = new ZookeeperMonitor();
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
servers = zookeeperMonitor.getServers(zkNodeType); servers = zookeeperMonitor.getServersList(zkNodeType);
}catch (Exception e){ }catch (Exception e){
throw e; throw e;
}finally { }finally {

4
escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java

@ -40,7 +40,7 @@ public class ZookeeperMonitor extends AbstractZKClient{
* @return * @return
*/ */
public List<MasterServer> getMasterServers(){ public List<MasterServer> getMasterServers(){
return getServers(ZKNodeType.MASTER); return getServersList(ZKNodeType.MASTER);
} }
/** /**
@ -48,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{
* @return * @return
*/ */
public List<MasterServer> getWorkerServers(){ public List<MasterServer> getWorkerServers(){
return getServers(ZKNodeType.WORKER); return getServersList(ZKNodeType.WORKER);
} }
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) { private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {

92
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -20,7 +20,6 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable; import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.utils.ResInfo; import cn.escheduler.common.utils.ResInfo;
@ -223,28 +222,68 @@ public abstract class AbstractZKClient {
} }
} }
/**
* create zookeeper path according the zk node type.
* @param zkNodeType
* @return
* @throws Exception
*/
private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
// create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType);
String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
serverPathPrefix + "_", heartbeatZKInfo.getBytes());
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath;
}
/**
* register server, if server already exists, return null.
* @param zkNodeType
* @return register server path in zookeeper
*/
public String registerServer(ZKNodeType zkNodeType) throws Exception {
String registerPath = null;
String host = OSUtils.getHost();
if(checkZKNodeExists(host, zkNodeType)){
logger.error("register failure , {} server already started on host : {}" ,
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(ZKNodeType.MASTER);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
}
/** /**
* opType(add): if find dead server , then add to zk deadServerPath * opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk * opType(delete): delete path from zk
* *
* @param zNode node path * @param zNode node path
* @param serverType master or worker prefix * @param zkNodeType master or worker
* @param opType delete or add * @param opType delete or add
* @throws Exception * @throws Exception
*/ */
public void handleDeadServer(String zNode, String serverType, String opType) throws Exception { public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
//ip_sequenceno //ip_sequenceno
String[] zNodesPath = zNode.split("\\/"); String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1]; String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete //check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){ if(opType.equals(DELETE_ZK_OP)){
String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE);
String ip = ipAndSeqNo[0]; String ip = ipAndSeqNo[0];
removeDeadServerByHost(ip, serverType); removeDeadServerByHost(ip, type);
}else if(opType.equals(ADD_ZK_OP)){ }else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
@ -253,7 +292,8 @@ public abstract class AbstractZKClient {
zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
logger.info("{} server dead , and {} added to zk dead server path success" , serverType, zNode); logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
} }
} }
@ -314,8 +354,8 @@ public abstract class AbstractZKClient {
* @param zkNodeType * @param zkNodeType
* @return * @return
*/ */
public List<MasterServer> getServers(ZKNodeType zkNodeType){ public List<MasterServer> getServersList(ZKNodeType zkNodeType){
Map<String, String> masterMap = getServerList(zkNodeType); Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType);
List<MasterServer> masterServers = new ArrayList<>(); List<MasterServer> masterServers = new ArrayList<>();
@ -332,7 +372,7 @@ public abstract class AbstractZKClient {
* result : {host : resource info} * result : {host : resource info}
* @return * @return
*/ */
public Map<String, String> getServerList(ZKNodeType zkNodeType){ public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
Map<String, String> masterMap = new HashMap<>(); Map<String, String> masterMap = new HashMap<>();
try { try {
@ -363,7 +403,7 @@ public abstract class AbstractZKClient {
host, zkNodeType.toString()); host, zkNodeType.toString());
return false; return false;
} }
Map<String, String> serverMaps = getServerList(zkNodeType); Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){ for(String hostKey : serverMaps.keySet()){
if(hostKey.startsWith(host)){ if(hostKey.startsWith(host)){
return true; return true;
@ -497,6 +537,38 @@ public abstract class AbstractZKClient {
} }
} }
/**
* server self dead, stop all threads
* @param serverHost
* @param zkNodeType
*/
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
/**
* get host ip, string format: masterParentPath/ip_000001/value
* @param path
* @return
*/
protected String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1;
int endIndex = path.lastIndexOf("_");
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}
@Override @Override
public String toString() { public String toString() {
return "AbstractZKClient{" + return "AbstractZKClient{" +

19
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java

@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
public MasterServer(ProcessDao processDao){ public MasterServer(ProcessDao processDao){
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
this.serverDao = zkMasterClient.getServerDao();
this.alertDao = zkMasterClient.getAlertDao();
} }
public void run(ProcessDao processDao){ public void run(ProcessDao processDao){
@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL, heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL,
Constants.defaultMasterHeartbeatInterval); Constants.defaultMasterHeartbeatInterval);
// master exec thread pool num
int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
Constants.defaultMasterExecThreadNum);
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum);
// heartbeat thread implement // heartbeat thread implement
@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable {
heartbeatMasterService. heartbeatMasterService.
scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS);
// master exec thread pool num
int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS,
Constants.defaultMasterExecThreadNum);
// master scheduler thread // master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient, zkMasterClient,
@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable {
masterSchedulerService.execute(masterSchedulerThread); masterSchedulerService.execute(masterSchedulerThread);
// start QuartzExecutors // start QuartzExecutors
// TODO...
// what system should do if exception
try { try {
ProcessScheduleJob.init(processDao); ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start(); QuartzExecutors.getInstance().start();
@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
String host = OSUtils.getHost();
// clear master table register info
serverDao.deleteMaster(host);
logger.info("master server stopped"); logger.info("master server stopped");
if (zkMasterClient.getActiveMasterNum() <= 1) { if (zkMasterClient.getActiveMasterNum() <= 1) {
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, host, "Master-Server"); zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
} }
} }
} }

193
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -20,8 +20,6 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerServer;
import cn.escheduler.common.utils.ResInfo;
import cn.escheduler.server.utils.ProcessUtils; import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient {
* master database access * master database access
*/ */
private ServerDao serverDao = null; private ServerDao serverDao = null;
/** /**
* alert database access * alert database access
*/ */
@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient {
*/ */
private ProcessDao processDao; private ProcessDao processDao;
private Date createTime = null;
/** /**
* zkMasterClient * zkMasterClient
*/ */
@ -131,7 +124,7 @@ public class ZKMasterClient extends AbstractZKClient {
this.listenerWorker(); this.listenerWorker();
// register master // register master
this.registMaster(); this.registerMaster();
// check if fault tolerance is required,failure and tolerance // check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) { if (getActiveMasterNum() == 1) {
@ -157,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient {
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
} }
/**
* get maste dao
* @return
*/
public ServerDao getServerDao(){
return serverDao;
}
/** /**
* get alert dao * get alert dao
* @return * @return
@ -174,40 +158,24 @@ public class ZKMasterClient extends AbstractZKClient {
return alertDao; return alertDao;
} }
/** /**
* register master znode * register master znode
*/ */
public void registMaster(){ public void registerMaster(){
// get current date
Date now = new Date();
createTime = now ;
try { try {
String osHost = OSUtils.getHost(); String serverPath = registerServer(ZKNodeType.MASTER);
if(StringUtils.isEmpty(serverPath)){
// zookeeper node exists, cannot start a new one.
if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
logger.error("register failure , master already started on host : {}" , osHost);
// exit system
System.exit(-1); System.exit(-1);
} }
createMasterZNode(now);
logger.info("register master node {} success" , masterZNode);
// handle dead server
handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP);
} catch (Exception e) { } catch (Exception e) {
logger.error("register master failure : " + e.getMessage(),e); logger.error("register master failure : " + e.getMessage(),e);
System.exit(-1);
} }
} }
private void createMasterZNode(Date now) throws Exception {
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
}
/** /**
@ -217,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory); PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory);
try { try {
Date now = new Date();
createTime = now ;
masterPc.start(); masterPc.start();
masterPc.getListenable().addListener(new PathChildrenCacheListener() { masterPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
@ -229,8 +195,11 @@ public class ZKMasterClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("master node deleted : {}",event.getData().getPath()); String serverHost = getHostByEventDataPath(path);
removeMasterNode(path); if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){
return;
}
removeZKNodePath(path, ZKNodeType.MASTER, true);
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
break; break;
@ -242,46 +211,69 @@ public class ZKMasterClient extends AbstractZKClient {
}catch (Exception e){ }catch (Exception e){
logger.error("monitor master failed : " + e.getMessage(),e); logger.error("monitor master failed : " + e.getMessage(),e);
} }
}
} private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType.toString(), path);
private void removeMasterNode(String path) { InterProcessMutex mutex = null;
InterProcessMutex mutexLock = null;
try { try {
// handle dead server, add to zk dead server pth String failoverPath = getFailoverLockPath(zkNodeType);
handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); // create a distributed lock
mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire();
if(masterZNode.equals(path)){ String serverHost = getHostByEventDataPath(path);
logger.error("master server({}) of myself dead , stopping...", path); // handle dead server
stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
return; //alert server down.
alertServerDown(serverHost, zkNodeType);
//failover server
if(failover){
failoverServerWhenDown(serverHost, zkNodeType);
} }
}catch (Exception e){
// create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master logger.error("{} server failover failed.", zkNodeType.toString());
String znodeLock = zkMasterClient.getMasterFailoverLockPath(); logger.error("failover exception : " + e.getMessage(),e);
mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutexLock.acquire();
String masterHost = getHostByEventDataPath(path);
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
} }
if(StringUtils.isNotEmpty(masterHost)){ finally {
failoverMaster(masterHost); releaseMutex(mutex);
} }
}catch (Exception e){
logger.error("master failover failed : " + e.getMessage(),e);
}finally {
if (mutexLock != null){
try {
mutexLock.release();
} catch (Exception e) {
logger.error("lock relase failed : " + e.getMessage(),e);
} }
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost)){
return ;
} }
switch (zkNodeType){
case MASTER:
failoverMaster(serverHost);
break;
case WORKER:
failoverWorker(serverHost, true);
default:
break;
}
}
private String getFailoverLockPath(ZKNodeType zkNodeType){
switch (zkNodeType){
case MASTER:
return getMasterFailoverLockPath();
case WORKER:
return getWorkerFailoverLockPath();
default:
return "";
} }
} }
private void alertServerDown(String serverHost, ZKNodeType zkNodeType) {
String serverType = zkNodeType.toString();
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) {
alertDao.sendServerStopedAlert(1, serverHost, serverType);
}
}
/** /**
* monitor worker * monitor worker
@ -290,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient {
PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory); PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory);
try { try {
Date now = new Date();
createTime = now ;
workerPc.start(); workerPc.start();
workerPc.getListenable().addListener(new PathChildrenCacheListener() { workerPc.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
@ -303,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient {
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("node deleted : {}",event.getData().getPath()); logger.info("node deleted : {}",event.getData().getPath());
removeZKNodePath(path); removeZKNodePath(path, ZKNodeType.WORKER, true);
break; break;
default: default:
break; break;
@ -315,33 +305,6 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
private void removeZKNodePath(String path) {
InterProcessMutex mutex = null;
try {
// handle dead server
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
// create a distributed lock
String znodeLock = getWorkerFailoverLockPath();
mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();
String workerHost = getHostByEventDataPath(path);
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
}
if(StringUtils.isNotEmpty(workerHost)){
failoverWorker(workerHost, true);
}
}catch (Exception e){
logger.error("worker failover failed : " + e.getMessage(),e);
}
finally {
releaseMutex(mutex);
}
}
/** /**
* get master znode * get master znode
@ -381,7 +344,7 @@ public class ZKMasterClient extends AbstractZKClient {
return false; return false;
} }
Date workerServerStartDate = null; Date workerServerStartDate = null;
List<MasterServer> workerServers= getServers(ZKNodeType.WORKER); List<MasterServer> workerServers= getServersList(ZKNodeType.WORKER);
for(MasterServer server : workerServers){ for(MasterServer server : workerServers){
if(server.getHost().equals(taskInstance.getHost())){ if(server.getHost().equals(taskInstance.getHost())){
workerServerStartDate = server.getCreateTime(); workerServerStartDate = server.getCreateTime();
@ -444,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master failover end"); logger.info("master failover end");
} }
/**
* get host ip, string format: masterParentPath/ip_000001/value
* @param path
* @return
*/
private String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1;
int endIndex = path.lastIndexOf("_");
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}
} }

27
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

@ -18,12 +18,12 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.ServerDao;
import cn.escheduler.common.utils.ResInfo; import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -130,21 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient {
* register worker * register worker
*/ */
private void registWorker(){ private void registWorker(){
// get current date
Date now = new Date();
createTime = now ;
try { try {
if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){ String serverPath = registerServer(ZKNodeType.WORKER);
logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); if(StringUtils.isEmpty(serverPath)){
System.exit(-1); System.exit(-1);
} }
// create worker zknode
initWorkZNode();
// handle dead server
handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP);
} catch (Exception e) { } catch (Exception e) {
logger.error("register worker failure : " + e.getMessage(),e); logger.error("register worker failure : " + e.getMessage(),e);
System.exit(-1);
} }
} }
@ -167,16 +159,11 @@ public class ZKWorkerClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
// handle dead server, add to zk dead server path
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
//find myself dead //find myself dead
if(workerZNode.equals(path)){ String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
logger.warn(" worker server({}) of myself dead , stopping...", path); return;
stoppable.stop(String.format("worker server(%s) of myself dead , stopping",path));
} }
logger.info("node deleted : {}", event.getData().getPath());
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
break; break;

Loading…
Cancel
Save