Browse Source

update

pull/2/head
lenboo 5 years ago
parent
commit
758da8b9a4
  1. 4
      escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java
  2. 41
      escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
  3. 40
      escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
  4. 34
      escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
  5. 10
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  6. 87
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  7. 2
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  8. 25
      escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java
  9. 39
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  10. 10
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
  11. 5
      escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

4
escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java

@ -66,7 +66,7 @@ public class MonitorController extends BaseController{
logger.info("login user: {}, query all master", loginUser.getUserName()); logger.info("login user: {}, query all master", loginUser.getUserName());
try{ try{
logger.info("list master, user:{}", loginUser.getUserName()); logger.info("list master, user:{}", loginUser.getUserName());
Map<String, Object> result = serverService.queryMaster(loginUser); Map<String, Object> result = monitorService.queryMaster(loginUser);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
logger.error(LIST_MASTERS_ERROR.getMsg(),e); logger.error(LIST_MASTERS_ERROR.getMsg(),e);
@ -86,7 +86,7 @@ public class MonitorController extends BaseController{
public Result listWorker(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { public Result listWorker(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
logger.info("login user: {}, query all workers", loginUser.getUserName()); logger.info("login user: {}, query all workers", loginUser.getUserName());
try{ try{
Map<String, Object> result = serverService.queryWorker(loginUser); Map<String, Object> result = monitorService.queryWorker(loginUser);
return returnDataList(result); return returnDataList(result);
}catch (Exception e){ }catch (Exception e){
logger.error(LIST_WORKERS_ERROR.getMsg(),e); logger.error(LIST_WORKERS_ERROR.getMsg(),e);

41
escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java

@ -18,13 +18,16 @@ package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status; import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitorUtils; import cn.escheduler.api.utils.ZookeeperMonitor;
import cn.escheduler.dao.MonitorDBDao; import cn.escheduler.dao.MonitorDBDao;
import cn.escheduler.dao.model.MasterServer;
import cn.escheduler.dao.model.MonitorRecord; import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import org.apache.hadoop.mapred.Master;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -52,6 +55,22 @@ public class MonitorService extends BaseService{
} }
/**
* query master list
*
* @param loginUser
* @return
*/
public Map<String,Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
List<MasterServer> masterServers = new ZookeeperMonitor().getMasterServers();
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
return result;
}
/** /**
* query zookeeper state * query zookeeper state
@ -61,7 +80,7 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryZookeeperState(User loginUser) { public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitorUtils.zookeeperInfoList(); List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitor.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList); result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -69,4 +88,22 @@ public class MonitorService extends BaseService{
return result; return result;
} }
/**
* query master list
*
* @param loginUser
* @return
*/
public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
List<MasterServer> workerServers = new ZookeeperMonitor().getWorkerServers();
result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS);
return result;
}
} }

40
escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitorUtils.java → escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java

@ -1,22 +1,26 @@
package cn.escheduler.api.utils; package cn.escheduler.api.utils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.model.MasterServer;
import cn.escheduler.dao.model.ZookeeperRecord; import cn.escheduler.dao.model.ZookeeperRecord;
import cn.escheduler.server.ResInfo;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import sun.jvm.hotspot.opto.MachSafePointNode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* monitor zookeeper info * monitor zookeeper info
*/ */
public class ZookeeperMonitorUtils { public class ZookeeperMonitor extends AbstractZKClient{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitorUtils.class); private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum(); private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum();
/** /**
@ -33,6 +37,38 @@ public class ZookeeperMonitorUtils {
return null; return null;
} }
/**
* get server list.
* @param isMaster
* @return
*/
public List<MasterServer> getServers(boolean isMaster){
List<MasterServer> masterServers = new ArrayList<>();
Map<String, String> masterMap = getServerList(isMaster);
String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/**
* get master servers
* @return
*/
public List<MasterServer> getMasterServers(){
return getServers(true);
}
/**
* master construct is the same with worker, use the master instead
* @return
*/
public List<MasterServer> getWorkerServers(){
return getServers(false);
}
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) { private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {

34
escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java

@ -0,0 +1,34 @@
package cn.escheduler.api.utils;
import cn.escheduler.dao.model.MasterServer;
import org.junit.Assert;
import org.junit.Test;
import javax.crypto.MacSpi;
import java.util.List;
import static org.junit.Assert.*;
public class ZookeeperMonitorUtilsTest {
@Test
public void testGetMasterLsit(){
ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor();
List<MasterServer> masterServerList = zookeeperMonitor.getMasterServers();
List<MasterServer> workerServerList = zookeeperMonitor.getWorkerServers();
System.out.println("master:" + masterServerList);
System.out.println("worker:" + workerServerList);
Assert.assertEquals(masterServerList.size(), 1);
Assert.assertEquals(workerServerList.size(), 1);
}
}

10
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -417,16 +417,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
} }
} }
/**
* get zookeeper client of CuratorFramework
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/** /**
* Get the task queue path * Get the task queue path
* @param key task queue name * @param key task queue name

87
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -30,13 +30,12 @@ import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.List;
import static cn.escheduler.common.Constants.*; import static cn.escheduler.common.Constants.*;
@ -213,9 +212,9 @@ public abstract class AbstractZKClient {
protected void initSystemZNode(){ protected void initSystemZNode(){
try { try {
// read master node parent path from conf // read master node parent path from conf
masterZNodeParentPath = conf.getString(Constants.ZOOKEEPER_ESCHEDULER_MASTERS); masterZNodeParentPath = getMasterZNodeParentPath();
// read worker node parent path from conf // read worker node parent path from conf
workerZNodeParentPath = conf.getString(Constants.ZOOKEEPER_ESCHEDULER_WORKERS); workerZNodeParentPath = getWorkerZNodeParentPath();
// read server node parent path from conf // read server node parent path from conf
deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
@ -243,6 +242,7 @@ public abstract class AbstractZKClient {
} }
} }
public void removeDeadServerByHost(String host, String serverType) throws Exception { public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
for(String serverPath : deadServers){ for(String serverPath : deadServers){
@ -291,6 +291,8 @@ public abstract class AbstractZKClient {
} }
/** /**
* for stop server * for stop server
* @param serverStoppable * @param serverStoppable
@ -340,6 +342,81 @@ public abstract class AbstractZKClient {
return sb.toString(); return sb.toString();
} }
/**
* get master server list map.
* result : {host : resource info}
* @return
*/
public Map<String, String> getServerList(boolean isMaster ){
Map<String, String> masterMap = new HashMap<>();
try {
String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
List<String> serverList = getZkClient().getChildren().forPath(path);
for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
masterMap.putIfAbsent(server, new String(bytes));
}
} catch (Exception e) {
e.printStackTrace();
}
return masterMap;
}
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
* get worker node parent path
* @return
*/
protected String getWorkerZNodeParentPath(){return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_WORKERS);};
/**
* get master node parent path
* @return
*/
protected String getMasterZNodeParentPath(){return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_MASTERS);}
/**
* get master lock path
* @return
*/
public String getMasterLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
/**
* get master start up lock path
* @return
*/
public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS);
}
/**
* get master failover lock path
* @return
*/
public String getMasterFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_MASTERS);
}
/**
* get worker failover lock path
* @return
*/
public String getWorkerFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
}
@Override @Override
public String toString() { public String toString() {
return "AbstractZKClient{" + return "AbstractZKClient{" +

2
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -84,8 +84,6 @@ public class TaskQueueImplTest {
return; return;
} }
String node2 = tasks.get(0);
} }

25
escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java

@ -17,8 +17,12 @@
package cn.escheduler.server; package cn.escheduler.server;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.MasterServer;
import java.util.Date;
/** /**
* heartbeat for ZK reigster res info * heartbeat for ZK reigster res info
@ -119,4 +123,25 @@ public class ResInfo {
+ lastHeartbeatTime; + lastHeartbeatTime;
} }
/**
* parse heartbeat info for zk
* @param heartBeatInfo
* @return
*/
public static MasterServer parseHeartbeatForZKInfo(String heartBeatInfo){
MasterServer masterServer = null;
String[] masterArray = heartBeatInfo.split(Constants.COMMA);
if(masterArray.length != 6){
return masterServer;
}
masterServer = new MasterServer();
masterServer.setHost(masterArray[0]);
masterServer.setPort(Integer.parseInt(masterArray[1]));
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[2]), Double.parseDouble(masterArray[3])));
masterServer.setCreateTime(DateUtils.stringToDate(masterArray[4]));
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[5]));
return masterServer;
}
} }

39
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -448,45 +448,6 @@ public class ZKMasterClient extends AbstractZKClient {
} }
/**
* get master lock path
* @return
*/
public String getMasterLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
/**
* get master start up lock path
* @return
*/
public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS);
}
/**
* get master failover lock path
* @return
*/
public String getMasterFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_MASTERS);
}
/**
* get worker failover lock path
* @return
*/
public String getWorkerFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
}
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/** /**

10
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

@ -264,16 +264,6 @@ public class ZKWorkerClient extends AbstractZKClient {
return workerZNode; return workerZNode;
} }
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/** /**
* get worker lock path * get worker lock path
* @return * @return

5
escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

@ -1,6 +1,7 @@
package cn.escheduler.server.zk; package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.zk.AbstractZKClient;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
@ -17,8 +18,8 @@ public class ZKWorkerClientTest {
public void getZKWorkerClient() throws Exception { public void getZKWorkerClient() throws Exception {
ZKWorkerClient zkWorkerClient = ZKWorkerClient.getZKWorkerClient(); // ZKWorkerClient zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
zkWorkerClient.removeDeadServerByHost("127.0.0.1", Constants.WORKER_PREFIX); // zkWorkerClient.removeDeadServerByHost("127.0.0.1", Constants.WORKER_PREFIX);
} }

Loading…
Cancel
Save