From 362fb0e752168f868c0c63c0acdc46669b9bf5bc Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 23 May 2019 15:24:23 +0800 Subject: [PATCH 1/2] master startup lock dev --- .../java/cn/escheduler/common/Constants.java | 5 ++ .../src/main/resources/zookeeper.properties | 1 + .../escheduler/server/zk/ZKMasterClient.java | 56 +++++++++++++++---- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index abf29fdc99..b12145973e 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -162,6 +162,11 @@ public final class Constants { */ public static final String ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.escheduler.lock.failover.workers"; + /** + * MasterServer startup failover runing and fault tolerance process + */ + public static final String ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.escheduler.lock.failover.startup.masters"; + /** * need send warn times when master server or worker server failover */ diff --git a/escheduler-common/src/main/resources/zookeeper.properties b/escheduler-common/src/main/resources/zookeeper.properties index 7df397ca90..207bc8230c 100644 --- a/escheduler-common/src/main/resources/zookeeper.properties +++ b/escheduler-common/src/main/resources/zookeeper.properties @@ -16,6 +16,7 @@ zookeeper.escheduler.lock.workers=/escheduler/lock/workers #escheduler failover directory zookeeper.escheduler.lock.failover.masters=/escheduler/lock/failover/masters zookeeper.escheduler.lock.failover.workers=/escheduler/lock/failover/workers +zookeeper.escheduler.lock.failover.startup.masters=/escheduler/lock/failover/startup-masters #escheduler failover directory zookeeper.session.timeout=300 diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index fe3360484c..8d2410098d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -31,6 +31,7 @@ import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; @@ -111,21 +112,46 @@ public class ZKMasterClient extends AbstractZKClient { // init dao this.initDao(); - // init system znode - this.initSystemZNode(); + InterProcessMutex mutex = null; + try { + // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master + String znodeLock = zkMasterClient.getMasterStartUpLockPath(); + + mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutex.acquire(); + + // init system znode + this.initSystemZNode(); + + // monitor master + this.listenerMaster(); - // monitor master - this.listenerMaster(); + // monitor worker + this.listenerWorker(); - // monitor worker - this.listenerWorker(); + // register master + this.registMaster(); - // register master - this.registMaster(); + // check if fault tolerance is required,failure and tolerance + if (getActiveMasterNum() == 1) { + processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); + } + + }catch (Exception e){ + logger.error("master scheduler thread exception : " + e.getMessage(),e); + }finally { + if (mutex != null){ + try { + mutex.release(); + } catch (Exception e) { + if(e.getMessage().equals("instance must be started before calling this method")){ + logger.warn("lock release"); + }else{ + logger.error("lock release failed : " + e.getMessage(),e); + } - // check if fault tolerance is required,failure and tolerance - if (getActiveMasterNum() == 1) { - processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); + } + } } } @@ -417,6 +443,14 @@ public class ZKMasterClient extends AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); } + /** + * get master start up lock path + * @return + */ + public String getMasterStartUpLockPath(){ + return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS); + } + /** * get master failover lock path * @return From 2d617b56c85c9db90a6fec0719f48e851d5c6e8f Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 23 May 2019 15:56:54 +0800 Subject: [PATCH 2/2] master startup lock dev update --- .../main/java/cn/escheduler/server/zk/ZKMasterClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 8d2410098d..e5ef726f9c 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -115,9 +115,9 @@ public class ZKMasterClient extends AbstractZKClient { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master - String znodeLock = zkMasterClient.getMasterStartUpLockPath(); + String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutex = new InterProcessMutex(zkClient, znodeLock); mutex.acquire(); // init system znode @@ -138,7 +138,7 @@ public class ZKMasterClient extends AbstractZKClient { } }catch (Exception e){ - logger.error("master scheduler thread exception : " + e.getMessage(),e); + logger.error("master start up exception : " + e.getMessage(),e); }finally { if (mutex != null){ try {