|
|
|
@ -31,6 +31,7 @@ import cn.escheduler.dao.model.TaskInstance;
|
|
|
|
|
import cn.escheduler.server.ResInfo; |
|
|
|
|
import cn.escheduler.server.utils.ProcessUtils; |
|
|
|
|
import org.apache.curator.framework.CuratorFramework; |
|
|
|
|
import org.apache.curator.framework.imps.CuratorFrameworkState; |
|
|
|
|
import org.apache.curator.framework.recipes.cache.PathChildrenCache; |
|
|
|
|
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; |
|
|
|
|
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; |
|
|
|
@ -111,21 +112,46 @@ public class ZKMasterClient extends AbstractZKClient {
|
|
|
|
|
// init dao
|
|
|
|
|
this.initDao(); |
|
|
|
|
|
|
|
|
|
// init system znode
|
|
|
|
|
this.initSystemZNode(); |
|
|
|
|
InterProcessMutex mutex = null; |
|
|
|
|
try { |
|
|
|
|
// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
|
|
|
|
|
String znodeLock = getMasterStartUpLockPath(); |
|
|
|
|
|
|
|
|
|
mutex = new InterProcessMutex(zkClient, znodeLock); |
|
|
|
|
mutex.acquire(); |
|
|
|
|
|
|
|
|
|
// init system znode
|
|
|
|
|
this.initSystemZNode(); |
|
|
|
|
|
|
|
|
|
// monitor master
|
|
|
|
|
this.listenerMaster(); |
|
|
|
|
|
|
|
|
|
// monitor master
|
|
|
|
|
this.listenerMaster(); |
|
|
|
|
// monitor worker
|
|
|
|
|
this.listenerWorker(); |
|
|
|
|
|
|
|
|
|
// monitor worker
|
|
|
|
|
this.listenerWorker(); |
|
|
|
|
// register master
|
|
|
|
|
this.registMaster(); |
|
|
|
|
|
|
|
|
|
// register master
|
|
|
|
|
this.registMaster(); |
|
|
|
|
// check if fault tolerance is required,failure and tolerance
|
|
|
|
|
if (getActiveMasterNum() == 1) { |
|
|
|
|
processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
}catch (Exception e){ |
|
|
|
|
logger.error("master start up exception : " + e.getMessage(),e); |
|
|
|
|
}finally { |
|
|
|
|
if (mutex != null){ |
|
|
|
|
try { |
|
|
|
|
mutex.release(); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
if(e.getMessage().equals("instance must be started before calling this method")){ |
|
|
|
|
logger.warn("lock release"); |
|
|
|
|
}else{ |
|
|
|
|
logger.error("lock release failed : " + e.getMessage(),e); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// check if fault tolerance is required,failure and tolerance
|
|
|
|
|
if (getActiveMasterNum() == 1) { |
|
|
|
|
processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -417,6 +443,14 @@ public class ZKMasterClient extends AbstractZKClient {
|
|
|
|
|
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get master start up lock path |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
public String getMasterStartUpLockPath(){ |
|
|
|
|
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get master failover lock path |
|
|
|
|
* @return |
|
|
|
|