Browse Source

refactor zkMasterClient/zkWorkerClient

pull/2/head
lenboo 5 years ago
parent
commit
47f90e25aa
  1. 12
      escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
  2. 1
      escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
  3. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java
  4. 26
      escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
  5. 2
      escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
  6. 2
      escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java
  7. 7
      escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java
  8. 102
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  9. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java
  10. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java
  11. 2
      escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java
  12. 14
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java
  13. 13
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  14. 182
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  15. 56
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

12
escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java

@ -20,14 +20,12 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitor; import cn.escheduler.api.utils.ZookeeperMonitor;
import cn.escheduler.dao.MonitorDBDao; import cn.escheduler.dao.MonitorDBDao;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.MonitorRecord; import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import org.apache.hadoop.mapred.Master;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -65,7 +63,9 @@ public class MonitorService extends BaseService{
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<MasterServer> masterServers = new ZookeeperMonitor().getMasterServers(); ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor();
List<MasterServer> masterServers = zookeeperMonitor.getMasterServers();
zookeeperMonitor.close();
result.put(Constants.DATA_LIST, masterServers); result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);
@ -99,8 +99,10 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryWorker(User loginUser) { public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor();
List<MasterServer> workerServers = zookeeperMonitor.getWorkerServers();
zookeeperMonitor.close();
List<MasterServer> workerServers = new ZookeeperMonitor().getWorkerServers();
result.put(Constants.DATA_LIST, workerServers); result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);

1
escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java

@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy;
import cn.escheduler.common.enums.Priority; import cn.escheduler.common.enums.Priority;
import cn.escheduler.common.enums.ReleaseState; import cn.escheduler.common.enums.ReleaseState;
import cn.escheduler.common.enums.WarningType; import cn.escheduler.common.enums.WarningType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;

2
escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java

@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Constants;
import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

26
escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java

@ -1,9 +1,9 @@
package cn.escheduler.api.utils; package cn.escheduler.api.utils;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import cn.escheduler.server.ResInfo;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{
return null; return null;
} }
/**
* get server list.
* @param isMaster
* @return
*/
public List<MasterServer> getServers(boolean isMaster){
List<MasterServer> masterServers = new ArrayList<>();
Map<String, String> masterMap = getServerList(isMaster);
String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/** /**
* get master servers * get master servers
* @return * @return
*/ */
public List<MasterServer> getMasterServers(){ public List<MasterServer> getMasterServers(){
return getServers(true); return getServers(ZKNodeType.MASTER);
} }
/** /**
@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{
* @return * @return
*/ */
public List<MasterServer> getWorkerServers(){ public List<MasterServer> getWorkerServers(){
return getServers(false); return getServers(ZKNodeType.WORKER);
} }
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) { private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {

2
escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java

@ -1,6 +1,6 @@
package cn.escheduler.api.utils; package cn.escheduler.api.utils;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

2
escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java → escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package cn.escheduler.dao.model; package cn.escheduler.common.model;
import java.util.Date; import java.util.Date;

7
escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java → escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java

@ -14,13 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package cn.escheduler.server; package cn.escheduler.common.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.MasterServer;
import java.util.Date; import java.util.Date;

102
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -18,19 +18,24 @@ package cn.escheduler.common.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable; import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.utils.ResInfo;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -217,7 +222,7 @@ public abstract class AbstractZKClient {
workerZNodeParentPath = getWorkerZNodeParentPath(); workerZNodeParentPath = getWorkerZNodeParentPath();
// read server node parent path from conf // read server node parent path from conf
deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); deadServerZNodeParentPath = getDeadZNodeParentPath();
if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){ if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){
// create persistent dead server parent node // create persistent dead server parent node
@ -243,6 +248,7 @@ public abstract class AbstractZKClient {
} }
public void removeDeadServerByHost(String host, String serverType) throws Exception { public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
for(String serverPath : deadServers){ for(String serverPath : deadServers){
@ -341,16 +347,34 @@ public abstract class AbstractZKClient {
return sb.toString(); return sb.toString();
} }
/**
* get server list.
* @param zkNodeType
* @return
*/
public List<MasterServer> getServers(ZKNodeType zkNodeType){
Map<String, String> masterMap = getServerList(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<MasterServer> masterServers = new ArrayList<>();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/** /**
* get master server list map. * get master server list map.
* result : {host : resource info} * result : {host : resource info}
* @return * @return
*/ */
public Map<String, String> getServerList(boolean isMaster ){ public Map<String, String> getServerList(ZKNodeType zkNodeType){
Map<String, String> masterMap = new HashMap<>(); Map<String, String> masterMap = new HashMap<>();
try { try {
String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); String path = getZNodeParentPath(zkNodeType);
List<String> serverList = getZkClient().getChildren().forPath(path); List<String> serverList = getZkClient().getChildren().forPath(path);
for(String server : serverList){ for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server); byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
@ -363,6 +387,28 @@ public abstract class AbstractZKClient {
return masterMap; return masterMap;
} }
/**
* check the zookeeper node already exists
* @param host
* @param zkNodeType
* @return
* @throws Exception
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
return false;
}
Map<String, String> serverMaps = getServerList(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.startsWith(host)){
return true;
}
}
return false;
}
/** /**
* get zkclient * get zkclient
* @return * @return
@ -391,6 +437,34 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
} }
/**
* get zookeeper node parent path
* @param zkNodeType
* @return
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType){
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
return getWorkerZNodeParentPath();
case DEAD_SERVER:
return getDeadZNodeParentPath();
default:
break;
}
return path;
}
/**
* get dead server node parent path
* @return
*/
protected String getDeadZNodeParentPath(){
return conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
}
/** /**
* get master start up lock path * get master start up lock path
* @return * @return
@ -415,6 +489,26 @@ public abstract class AbstractZKClient {
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS); return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
} }
/**
* release mutex
* @param mutex
*/
public static void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
}
@Override @Override
public String toString() { public String toString() {

2
escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java

@ -18,7 +18,7 @@ package cn.escheduler.dao;
import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.MasterServerMapper;
import cn.escheduler.dao.mapper.WorkerServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.dao.model.WorkerServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java

@ -16,7 +16,7 @@
*/ */
package cn.escheduler.dao.mapper; package cn.escheduler.dao.mapper;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.apache.ibatis.annotations.*; import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.JdbcType; import org.apache.ibatis.type.JdbcType;

2
escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java

@ -17,7 +17,7 @@
package cn.escheduler.dao.mapper; package cn.escheduler.dao.mapper;
import cn.escheduler.dao.datasource.ConnectionFactory; import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.model.MasterServer; import cn.escheduler.common.model.MasterServer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;

14
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java

@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.server.zk.ZKMasterClient; import cn.escheduler.server.zk.ZKMasterClient;
@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable {
}catch (Exception e){ }catch (Exception e){
logger.error("master scheduler thread exception : " + e.getMessage(),e); logger.error("master scheduler thread exception : " + e.getMessage(),e);
}finally{ }finally{
if (mutex != null){ AbstractZKClient.releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
} }
} }
} }

13
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -22,6 +22,7 @@ import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*; import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient; import cn.escheduler.server.zk.ZKWorkerClient;
@ -235,17 +236,7 @@ public class FetchTaskThread implements Runnable{
}catch (Exception e){ }catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e); logger.error("fetch task thread exception : " + e.getMessage(),e);
}finally { }finally {
if (mutex != null){ AbstractZKClient.releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("fetch task lock release");
}else{
logger.error("fetch task lock release failed : " + e.getMessage(),e);
}
}
}
} }
} }
} }

182
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -19,8 +19,8 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
@ -30,7 +30,7 @@ import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.dao.model.WorkerServer;
import cn.escheduler.server.ResInfo; import cn.escheduler.common.utils.ResInfo;
import cn.escheduler.server.utils.ProcessUtils; import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -118,7 +118,6 @@ public class ZKMasterClient extends AbstractZKClient {
try { try {
// create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath(); String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(zkClient, znodeLock); mutex = new InterProcessMutex(zkClient, znodeLock);
mutex.acquire(); mutex.acquire();
@ -137,29 +136,19 @@ public class ZKMasterClient extends AbstractZKClient {
// check if fault tolerance is required,failure and tolerance // check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) { if (getActiveMasterNum() == 1) {
failoverWorker(null, true); failoverWorker(null, true);
// processDao.masterStartupFaultTolerant();
failoverMaster(null); failoverMaster(null);
} }
}catch (Exception e){ }catch (Exception e){
logger.error("master start up exception : " + e.getMessage(),e); logger.error("master start up exception : " + e.getMessage(),e);
}finally { }finally {
if (mutex != null){ releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed : " + e.getMessage(),e);
}
}
}
} }
} }
/** /**
* init dao * init dao
*/ */
@ -202,74 +191,24 @@ public class ZKMasterClient extends AbstractZKClient {
// exit system // exit system
System.exit(-1); System.exit(-1);
} }
createMasterZNode(now);
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
logger.info("register master node {} success" , masterZNode); logger.info("register master node {} success" , masterZNode);
// handle dead server // handle dead server
handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP); handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP);
// delete master server from database
serverDao.deleteMaster(OSUtils.getHost());
// register master znode
serverDao.registerMaster(OSUtils.getHost(),
OSUtils.getProcessID(),
masterZNode,
ResInfo.getResInfoJson(),
createTime,
createTime);
} catch (Exception e) { } catch (Exception e) {
logger.error("register master failure : " + e.getMessage(),e); logger.error("register master failure : " + e.getMessage(),e);
} }
} }
private void createMasterZNode(Date now) throws Exception {
/** // specify the format of stored data in ZK nodes
* check the zookeeper node already exists String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
* @param host // create temporary sequence nodes for master znode
* @param zkNodeType masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
* @return masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
* @throws Exception
*/
private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
String path = null;
switch (zkNodeType){
case MASTER:
path = masterZNodeParentPath;
break;
case WORKER:
path = workerZNodeParentPath;
break;
case DEAD_SERVER:
path = deadServerZNodeParentPath;
break;
default:
break;
}
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
return false;
} }
List<String> serverList = null;
serverList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(serverList)){
for (String masterZNode : serverList){
if (masterZNode.startsWith(host)){
return true;
}
}
}
return false;
}
/** /**
* monitor master * monitor master
@ -291,7 +230,22 @@ public class ZKMasterClient extends AbstractZKClient {
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("master node deleted : {}",event.getData().getPath()); logger.info("master node deleted : {}",event.getData().getPath());
removeMasterNode(path);
break;
case CHILD_UPDATED:
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("monitor master failed : " + e.getMessage(),e);
}
}
private void removeMasterNode(String path) {
InterProcessMutex mutexLock = null; InterProcessMutex mutexLock = null;
try { try {
// handle dead server, add to zk dead server pth // handle dead server, add to zk dead server pth
@ -300,7 +254,7 @@ public class ZKMasterClient extends AbstractZKClient {
if(masterZNode.equals(path)){ if(masterZNode.equals(path)){
logger.error("master server({}) of myself dead , stopping...", path); logger.error("master server({}) of myself dead , stopping...", path);
stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path));
break; return;
} }
// create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master
@ -326,38 +280,9 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
} }
break;
case CHILD_UPDATED:
if (event.getData().getPath().contains(OSUtils.getHost())){
byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return;
}
// updateProcessInstance Master information in database according to host
serverDao.updateMaster(OSUtils.getHost(),
OSUtils.getProcessID(),
ResInfo.getResInfoJson(Double.parseDouble(splits[2]),
Double.parseDouble(splits[3])),
DateUtils.stringToDate(splits[5]));
logger.debug("master zk node updated : {}",event.getData().getPath());
}
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("monitor master failed : " + e.getMessage(),e);
}
} }
/** /**
* monitor worker * monitor worker
*/ */
@ -377,9 +302,20 @@ public class ZKMasterClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
logger.info("node deleted : {}",event.getData().getPath()); logger.info("node deleted : {}",event.getData().getPath());
removeZKNodePath(path);
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("listener worker failed : " + e.getMessage(),e);
}
}
private void removeZKNodePath(String path) {
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
@ -403,25 +339,9 @@ public class ZKMasterClient extends AbstractZKClient {
logger.error("worker failover failed : " + e.getMessage(),e); logger.error("worker failover failed : " + e.getMessage(),e);
} }
finally { finally {
if (mutex != null){ releaseMutex(mutex);
try {
mutex.release();
} catch (Exception e) {
logger.error("lock relase failed : " + e.getMessage(),e);
}
} }
} }
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("listener worker failed : " + e.getMessage(),e);
}
}
/** /**
* get master znode * get master znode
@ -431,9 +351,6 @@ public class ZKMasterClient extends AbstractZKClient {
return masterZNode; return masterZNode;
} }
/** /**
* task needs failover if task start before worker starts * task needs failover if task start before worker starts
* *
@ -460,15 +377,20 @@ public class ZKMasterClient extends AbstractZKClient {
* @return * @return
*/ */
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
if(StringUtils.isEmpty(taskInstance.getHost())){
return false;
}
Date workerServerStartDate = null; Date workerServerStartDate = null;
List<WorkerServer> workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost()); List<MasterServer> workerServers= getServers(ZKNodeType.WORKER);
if(workerServers.size() > 0){ for(MasterServer server : workerServers){
workerServerStartDate = workerServers.get(0).getCreateTime(); if(server.getHost().equals(taskInstance.getHost())){
workerServerStartDate = server.getCreateTime();
break;
}
} }
if(workerServerStartDate != null){ if(workerServerStartDate != null){
return taskInstance.getStartTime().after(workerServerStartDate); return taskInstance.getStartTime().after(workerServerStartDate);
}else{ }else{
return false; return false;
} }
@ -478,6 +400,7 @@ public class ZKMasterClient extends AbstractZKClient {
* failover worker tasks * failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks. * 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* @param workerHost * @param workerHost
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
@ -501,9 +424,6 @@ public class ZKMasterClient extends AbstractZKClient {
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance); processDao.saveTaskInstance(taskInstance);
} }
//update task Instance state value is NEED_FAULT_TOLERANCE
// processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost); logger.info("end worker[{}] failover ...", workerHost);
} }

56
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

@ -17,13 +17,13 @@
package cn.escheduler.server.zk; package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.ServerDao;
import cn.escheduler.server.ResInfo; import cn.escheduler.common.utils.ResInfo;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -130,50 +130,19 @@ public class ZKWorkerClient extends AbstractZKClient {
* register worker * register worker
*/ */
private void registWorker(){ private void registWorker(){
// get current date // get current date
Date now = new Date(); Date now = new Date();
createTime = now ; createTime = now ;
try { try {
if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){
// encapsulation worker znnode
workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_";
List<String> workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath);
if (CollectionUtils.isNotEmpty(workerZNodeList)){
boolean flag = false;
for (String workerZNode : workerZNodeList){
if (workerZNode.startsWith(OSUtils.getHost())){
flag = true;
break;
}
}
if (flag){
logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost());
// exit system
System.exit(-1); System.exit(-1);
} }
}
// String heartbeatZKInfo = getOsInfo(now);
// workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode,
// heartbeatZKInfo.getBytes());
// create worker zknode
initWorkZNode(); initWorkZNode();
// handle dead server // handle dead server
handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP); handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP);
// delete worker server from database
serverDao.deleteWorker(OSUtils.getHost());
// register worker znode
serverDao.registerWorker(OSUtils.getHost(),
OSUtils.getProcessID(),
workerZNode,
ResInfo.getResInfoJson(),
createTime,
createTime);
} catch (Exception e) { } catch (Exception e) {
logger.error("register worker failure : " + e.getMessage(),e); logger.error("register worker failure : " + e.getMessage(),e);
} }
@ -198,7 +167,6 @@ public class ZKWorkerClient extends AbstractZKClient {
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
String path = event.getData().getPath(); String path = event.getData().getPath();
// handle dead server, add to zk dead server path // handle dead server, add to zk dead server path
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
@ -211,22 +179,6 @@ public class ZKWorkerClient extends AbstractZKClient {
logger.info("node deleted : {}", event.getData().getPath()); logger.info("node deleted : {}", event.getData().getPath());
break; break;
case CHILD_UPDATED: case CHILD_UPDATED:
if (event.getData().getPath().contains(OSUtils.getHost())){
byte[] bytes = zkClient.getData().forPath(event.getData().getPath());
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
return;
}
// updateProcessInstance master info in database according to host
serverDao.updateWorker(OSUtils.getHost(),
OSUtils.getProcessID(),
ResInfo.getResInfoJson(Double.parseDouble(splits[2])
,Double.parseDouble(splits[3])),
DateUtils.stringToDate(splits[5]));
logger.debug("node updated : {}",event.getData().getPath());
}
break; break;
default: default:
break; break;

Loading…
Cancel
Save