Browse Source

master startup lock dev

pull/2/head
qiaozhanwei 6 years ago
parent
commit
362fb0e752
  1. 5
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  2. 1
      escheduler-common/src/main/resources/zookeeper.properties
  3. 34
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

5
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -162,6 +162,11 @@ public final class Constants {
*/
public static final String ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.escheduler.lock.failover.workers";
/**
* MasterServer startup failover runing and fault tolerance process
*/
public static final String ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.escheduler.lock.failover.startup.masters";
/**
* need send warn times when master server or worker server failover
*/

1
escheduler-common/src/main/resources/zookeeper.properties

@ -16,6 +16,7 @@ zookeeper.escheduler.lock.workers=/escheduler/lock/workers
#escheduler failover directory
zookeeper.escheduler.lock.failover.masters=/escheduler/lock/failover/masters
zookeeper.escheduler.lock.failover.workers=/escheduler/lock/failover/workers
zookeeper.escheduler.lock.failover.startup.masters=/escheduler/lock/failover/startup-masters
#escheduler failover directory
zookeeper.session.timeout=300

34
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -31,6 +31,7 @@ import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -111,6 +112,14 @@ public class ZKMasterClient extends AbstractZKClient {
// init dao
this.initDao();
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
String znodeLock = zkMasterClient.getMasterStartUpLockPath();
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
mutex.acquire();
// init system znode
this.initSystemZNode();
@ -127,6 +136,23 @@ public class ZKMasterClient extends AbstractZKClient {
if (getActiveMasterNum() == 1) {
processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal());
}
}catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally {
if (mutex != null){
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
}
}
@ -417,6 +443,14 @@ public class ZKMasterClient extends AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
/**
* get master start up lock path
* @return
*/
public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS);
}
/**
* get master failover lock path
* @return

Loading…
Cancel
Save