Browse Source

Merge pull request #550 from lenboo/dev-1.1.0

get master/worker informations from zookeeper
pull/2/head
easyscheduler 6 years ago committed by GitHub
parent
commit
ca701bd0ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java
  2. 41
      escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java
  3. 39
      escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java
  4. 29
      escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java
  5. 10
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  6. 6
      escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
  7. 87
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  8. 4
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  9. 41
      escheduler-common/src/test/java/cn/escheduler/common/utils/IpUtilsTest.java
  10. 35
      escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java
  11. 67
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  12. 31
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java
  13. 5
      escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

4
escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java

@ -66,7 +66,7 @@ public class MonitorController extends BaseController{
logger.info("login user: {}, query all master", loginUser.getUserName());
try{
logger.info("list master, user:{}", loginUser.getUserName());
Map<String, Object> result = serverService.queryMaster(loginUser);
Map<String, Object> result = monitorService.queryMaster(loginUser);
return returnDataList(result);
}catch (Exception e){
logger.error(LIST_MASTERS_ERROR.getMsg(),e);
@ -86,7 +86,7 @@ public class MonitorController extends BaseController{
public Result listWorker(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
logger.info("login user: {}, query all workers", loginUser.getUserName());
try{
Map<String, Object> result = serverService.queryWorker(loginUser);
Map<String, Object> result = monitorService.queryWorker(loginUser);
return returnDataList(result);
}catch (Exception e){
logger.error(LIST_WORKERS_ERROR.getMsg(),e);

41
escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java

@ -18,13 +18,16 @@ package cn.escheduler.api.service;
import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitorUtils;
import cn.escheduler.api.utils.ZookeeperMonitor;
import cn.escheduler.dao.MonitorDBDao;
import cn.escheduler.dao.model.MasterServer;
import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord;
import org.apache.hadoop.mapred.Master;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,6 +55,22 @@ public class MonitorService extends BaseService{
}
/**
* query master list
*
* @param loginUser
* @return
*/
public Map<String,Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
List<MasterServer> masterServers = new ZookeeperMonitor().getMasterServers();
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
return result;
}
/**
* query zookeeper state
@ -61,7 +80,7 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitorUtils.zookeeperInfoList();
List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitor.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS);
@ -69,4 +88,22 @@ public class MonitorService extends BaseService{
return result;
}
/**
* query master list
*
* @param loginUser
* @return
*/
public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>(5);
List<MasterServer> workerServers = new ZookeeperMonitor().getWorkerServers();
result.put(Constants.DATA_LIST, workerServers);
putMsg(result,Status.SUCCESS);
return result;
}
}

39
escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitorUtils.java → escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java

@ -1,7 +1,9 @@
package cn.escheduler.api.utils;
import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.model.MasterServer;
import cn.escheduler.dao.model.ZookeeperRecord;
import cn.escheduler.server.ResInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -9,14 +11,15 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* monitor zookeeper info
*/
public class ZookeeperMonitorUtils {
public class ZookeeperMonitor extends AbstractZKClient{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitorUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum();
/**
@ -33,6 +36,38 @@ public class ZookeeperMonitorUtils {
return null;
}
/**
* get server list.
* @param isMaster
* @return
*/
public List<MasterServer> getServers(boolean isMaster){
List<MasterServer> masterServers = new ArrayList<>();
Map<String, String> masterMap = getServerList(isMaster);
String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
for(String path : masterMap.keySet()){
MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path));
masterServer.setZkDirectory( parentPath + "/"+ path);
masterServers.add(masterServer);
}
return masterServers;
}
/**
* get master servers
* @return
*/
public List<MasterServer> getMasterServers(){
return getServers(true);
}
/**
* master construct is the same with worker, use the master instead
* @return
*/
public List<MasterServer> getWorkerServers(){
return getServers(false);
}
private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {

29
escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java

@ -0,0 +1,29 @@
package cn.escheduler.api.utils;
import cn.escheduler.dao.model.MasterServer;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class ZookeeperMonitorUtilsTest {
@Test
public void testGetMasterLsit(){
ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor();
List<MasterServer> masterServerList = zookeeperMonitor.getMasterServers();
List<MasterServer> workerServerList = zookeeperMonitor.getWorkerServers();
Assert.assertEquals(masterServerList.size(), 1);
Assert.assertEquals(workerServerList.size(), 1);
}
}

10
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -417,16 +417,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
}
}
/**
* get zookeeper client of CuratorFramework
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
* Get the task queue path
* @param key task queue name

6
escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java

@ -61,10 +61,4 @@ public class IpUtils {
return sb.toString();
}
public static void main(String[] args){
long ipLong = ipToLong("11.3.4.5");
logger.info(longToIp(ipLong));
}
}

87
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -30,13 +30,12 @@ import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
import static cn.escheduler.common.Constants.*;
@ -213,9 +212,9 @@ public abstract class AbstractZKClient {
protected void initSystemZNode(){
try {
// read master node parent path from conf
masterZNodeParentPath = conf.getString(Constants.ZOOKEEPER_ESCHEDULER_MASTERS);
masterZNodeParentPath = getMasterZNodeParentPath();
// read worker node parent path from conf
workerZNodeParentPath = conf.getString(Constants.ZOOKEEPER_ESCHEDULER_WORKERS);
workerZNodeParentPath = getWorkerZNodeParentPath();
// read server node parent path from conf
deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS);
@ -243,6 +242,7 @@ public abstract class AbstractZKClient {
}
}
public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
for(String serverPath : deadServers){
@ -291,6 +291,8 @@ public abstract class AbstractZKClient {
}
/**
* for stop server
* @param serverStoppable
@ -339,6 +341,81 @@ public abstract class AbstractZKClient {
return sb.toString();
}
/**
* get master server list map.
* result : {host : resource info}
* @return
*/
public Map<String, String> getServerList(boolean isMaster ){
Map<String, String> masterMap = new HashMap<>();
try {
String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath();
List<String> serverList = getZkClient().getChildren().forPath(path);
for(String server : serverList){
byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
masterMap.putIfAbsent(server, new String(bytes));
}
} catch (Exception e) {
e.printStackTrace();
}
return masterMap;
}
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
* get worker node parent path
* @return
*/
protected String getWorkerZNodeParentPath(){return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_WORKERS);};
/**
* get master node parent path
* @return
*/
protected String getMasterZNodeParentPath(){return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_MASTERS);}
/**
* get master lock path
* @return
*/
public String getMasterLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
/**
* get master start up lock path
* @return
*/
public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS);
}
/**
* get master failover lock path
* @return
*/
public String getMasterFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_MASTERS);
}
/**
* get worker failover lock path
* @return
*/
public String getWorkerFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
}
@Override
public String toString() {
return "AbstractZKClient{" +

4
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -20,13 +20,11 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -84,8 +82,6 @@ public class TaskQueueImplTest {
return;
}
String node2 = tasks.get(0);
}

41
escheduler-common/src/test/java/cn/escheduler/common/utils/IpUtilsTest.java

@ -0,0 +1,41 @@
package cn.escheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
public class IpUtilsTest {
@Test
public void ipToLong() {
String ip = "192.168.110.1";
String ip2 = "0.0.0.0";
long longNumber = IpUtils.ipToLong(ip);
long longNumber2 = IpUtils.ipToLong(ip2);
System.out.println(longNumber);
Assert.assertEquals(longNumber, 3232263681L);
Assert.assertEquals(longNumber2, 0L);
String ip3 = "255.255.255.255";
long longNumber3 = IpUtils.ipToLong(ip3);
System.out.println(longNumber3);
Assert.assertEquals(longNumber3, 4294967295L);
}
@Test
public void longToIp() {
String ip = "192.168.110.1";
String ip2 = "0.0.0.0";
long longNum = 3232263681L;
String i1 = IpUtils.longToIp(longNum);
String i2 = IpUtils.longToIp(0);
Assert.assertEquals(ip, i1);
Assert.assertEquals(ip2, i2);
}
}

35
escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java

@ -17,8 +17,12 @@
package cn.escheduler.server;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.MasterServer;
import java.util.Date;
/**
* heartbeat for ZK reigster res info
@ -98,6 +102,16 @@ public class ResInfo {
}
public static String getHeartBeatInfo(Date now){
return buildHeartbeatForZKInfo(OSUtils.getHost(),
OSUtils.getProcessID(),
OSUtils.cpuUsage(),
OSUtils.memoryUsage(),
DateUtils.dateToString(now),
DateUtils.dateToString(now));
}
/**
* build heartbeat info for zk
* @param host
@ -119,4 +133,25 @@ public class ResInfo {
+ lastHeartbeatTime;
}
/**
* parse heartbeat info for zk
* @param heartBeatInfo
* @return
*/
public static MasterServer parseHeartbeatForZKInfo(String heartBeatInfo){
MasterServer masterServer = null;
String[] masterArray = heartBeatInfo.split(Constants.COMMA);
if(masterArray.length != 6){
return masterServer;
}
masterServer = new MasterServer();
masterServer.setHost(masterArray[0]);
masterServer.setPort(Integer.parseInt(masterArray[1]));
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[2]), Double.parseDouble(masterArray[3])));
masterServer.setCreateTime(DateUtils.stringToDate(masterArray[4]));
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[5]));
return masterServer;
}
}

67
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -204,7 +204,7 @@ public class ZKMasterClient extends AbstractZKClient {
}
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = getOsInfo(now);
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
@ -259,10 +259,10 @@ public class ZKMasterClient extends AbstractZKClient {
return false;
}
List<String> masterZNodeList = null;
masterZNodeList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(masterZNodeList)){
for (String masterZNode : masterZNodeList){
List<String> serverList = null;
serverList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(serverList)){
for (String masterZNode : serverList){
if (masterZNode.startsWith(host)){
return true;
}
@ -423,22 +423,6 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* get os info
* @param now
* @return
*/
private String getOsInfo(Date now) {
return ResInfo.buildHeartbeatForZKInfo(OSUtils.getHost(),
OSUtils.getProcessID(),
OSUtils.cpuUsage(),
OSUtils.memoryUsage(),
DateUtils.dateToString(now),
DateUtils.dateToString(now));
}
/**
* get master znode
* @return
@ -448,45 +432,6 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* get master lock path
* @return
*/
public String getMasterLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS);
}
/**
* get master start up lock path
* @return
*/
public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS);
}
/**
* get master failover lock path
* @return
*/
public String getMasterFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_MASTERS);
}
/**
* get worker failover lock path
* @return
*/
public String getWorkerFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS);
}
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
@ -580,7 +525,7 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* get host ip
* get host ip, string format: masterParentPath/ip_000001/value
* @param path
* @return
*/

31
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java

@ -116,11 +116,10 @@ public class ZKWorkerClient extends AbstractZKClient {
public String initWorkZNode() throws Exception {
Date now = new Date();
String heartbeatZKInfo = getOsInfo(now);
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_";
workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode,
heartbeatZKInfo.getBytes());
logger.info("register worker node {} success", workerZNode);
@ -141,7 +140,6 @@ public class ZKWorkerClient extends AbstractZKClient {
workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_";
List<String> workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath);
if (CollectionUtils.isNotEmpty(workerZNodeList)){
boolean flag = false;
for (String workerZNode : workerZNodeList){
@ -241,21 +239,6 @@ public class ZKWorkerClient extends AbstractZKClient {
}
/**
* get os info
* @param now
* @return
*/
private String getOsInfo(Date now) {
return ResInfo.buildHeartbeatForZKInfo(OSUtils.getHost(),
OSUtils.getProcessID(),
OSUtils.cpuUsage(),
OSUtils.memoryUsage(),
DateUtils.dateToString(now),
DateUtils.dateToString(now));
}
/**
* get worker znode
* @return
@ -264,16 +247,6 @@ public class ZKWorkerClient extends AbstractZKClient {
return workerZNode;
}
/**
* get zkclient
* @return
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
* get worker lock path
* @return

5
escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

@ -1,6 +1,7 @@
package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.zk.AbstractZKClient;
import org.junit.Test;
import java.util.Arrays;
@ -17,8 +18,8 @@ public class ZKWorkerClientTest {
public void getZKWorkerClient() throws Exception {
ZKWorkerClient zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
zkWorkerClient.removeDeadServerByHost("127.0.0.1", Constants.WORKER_PREFIX);
// ZKWorkerClient zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
// zkWorkerClient.removeDeadServerByHost("127.0.0.1", Constants.WORKER_PREFIX);
}

Loading…
Cancel
Save