From 47f90e25aa21b573612e3b822a2eb52898f9ebd6 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 11:41:22 +0800 Subject: [PATCH 01/11] refactor zkMasterClient/zkWorkerClient --- .../api/service/MonitorService.java | 12 +- .../api/service/SchedulerService.java | 1 + .../escheduler/api/service/ServerService.java | 2 +- .../api/utils/ZookeeperMonitor.java | 26 +- .../api/utils/ZookeeperMonitorUtilsTest.java | 2 +- .../common}/model/MasterServer.java | 2 +- .../cn/escheduler/common/utils}/ResInfo.java | 7 +- .../common/zk/AbstractZKClient.java | 102 ++++++- .../java/cn/escheduler/dao/ServerDao.java | 2 +- .../dao/mapper/MasterServerMapper.java | 2 +- .../dao/mapper/MasterServerMapperTest.java | 2 +- .../master/runner/MasterSchedulerThread.java | 14 +- .../server/worker/runner/FetchTaskThread.java | 13 +- .../escheduler/server/zk/ZKMasterClient.java | 256 ++++++------------ .../escheduler/server/zk/ZKWorkerClient.java | 60 +--- 15 files changed, 216 insertions(+), 287 deletions(-) rename {escheduler-dao/src/main/java/cn/escheduler/dao => escheduler-common/src/main/java/cn/escheduler/common}/model/MasterServer.java (98%) rename {escheduler-server/src/main/java/cn/escheduler/server => escheduler-common/src/main/java/cn/escheduler/common/utils}/ResInfo.java (95%) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java index 08e8bf576e..33d69859d2 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java @@ -20,14 +20,12 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.ZookeeperMonitor; import cn.escheduler.dao.MonitorDBDao; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.MonitorRecord; import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.ZookeeperRecord; -import org.apache.hadoop.mapred.Master; import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,7 +63,9 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - List masterServers = new ZookeeperMonitor().getMasterServers(); + ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); + List masterServers = zookeeperMonitor.getMasterServers(); + zookeeperMonitor.close(); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,8 +99,10 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); + ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); + List workerServers = zookeeperMonitor.getWorkerServers(); + zookeeperMonitor.close(); - List workerServers = new ZookeeperMonitor().getWorkerServers(); result.put(Constants.DATA_LIST, workerServers); putMsg(result,Status.SUCCESS); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java index d4515c79f3..64fc8c76eb 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java @@ -25,6 +25,7 @@ import cn.escheduler.common.enums.FailureStrategy; import cn.escheduler.common.enums.Priority; import cn.escheduler.common.enums.ReleaseState; import cn.escheduler.common.enums.WarningType; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.ProcessDao; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java index 6957075dda..a54b385b0d 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ServerService.java @@ -20,7 +20,7 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.WorkerServer; import org.springframework.beans.factory.annotation.Autowired; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java index 0f44b5f7db..ddbb6c9c1a 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java @@ -1,9 +1,9 @@ package cn.escheduler.api.utils; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.zk.AbstractZKClient; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.ZookeeperRecord; -import cn.escheduler.server.ResInfo; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; /** @@ -36,29 +35,12 @@ public class ZookeeperMonitor extends AbstractZKClient{ return null; } - /** - * get server list. - * @param isMaster - * @return - */ - public List getServers(boolean isMaster){ - List masterServers = new ArrayList<>(); - Map masterMap = getServerList(isMaster); - String parentPath = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); - for(String path : masterMap.keySet()){ - MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path)); - masterServer.setZkDirectory( parentPath + "/"+ path); - masterServers.add(masterServer); - } - return masterServers; - } - /** * get master servers * @return */ public List getMasterServers(){ - return getServers(true); + return getServers(ZKNodeType.MASTER); } /** @@ -66,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getWorkerServers(){ - return getServers(false); + return getServers(ZKNodeType.WORKER); } private static List zookeeperInfoList(String zookeeperServers) { diff --git a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java index 87a26ba449..4feb06ad5c 100644 --- a/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java +++ b/escheduler-api/src/test/java/cn/escheduler/api/utils/ZookeeperMonitorUtilsTest.java @@ -1,6 +1,6 @@ package cn.escheduler.api.utils; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.junit.Assert; import org.junit.Test; diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java similarity index 98% rename from escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java rename to escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java index 1ccb15b5cc..bb2f38cb14 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/MasterServer.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/model/MasterServer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cn.escheduler.dao.model; +package cn.escheduler.common.model; import java.util.Date; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java similarity index 95% rename from escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java rename to escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java index 844c7be8b0..6a48d6bc89 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/ResInfo.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/ResInfo.java @@ -14,13 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cn.escheduler.server; +package cn.escheduler.common.utils; import cn.escheduler.common.Constants; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.JSONUtils; -import cn.escheduler.common.utils.OSUtils; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import java.util.Date; diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 3c58996298..79f26c0e82 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -18,19 +18,24 @@ package cn.escheduler.common.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; +import cn.escheduler.common.enums.ZKNodeType; +import cn.escheduler.common.model.MasterServer; +import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.utils.ResInfo; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,7 +222,7 @@ public abstract class AbstractZKClient { workerZNodeParentPath = getWorkerZNodeParentPath(); // read server node parent path from conf - deadServerZNodeParentPath = conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); + deadServerZNodeParentPath = getDeadZNodeParentPath(); if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){ // create persistent dead server parent node @@ -243,6 +248,7 @@ public abstract class AbstractZKClient { } + public void removeDeadServerByHost(String host, String serverType) throws Exception { List deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); for(String serverPath : deadServers){ @@ -341,16 +347,34 @@ public abstract class AbstractZKClient { return sb.toString(); } + /** + * get server list. + * @param zkNodeType + * @return + */ + public List getServers(ZKNodeType zkNodeType){ + Map masterMap = getServerList(zkNodeType); + String parentPath = getZNodeParentPath(zkNodeType); + + List masterServers = new ArrayList<>(); + for(String path : masterMap.keySet()){ + MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(masterMap.get(path)); + masterServer.setZkDirectory( parentPath + "/"+ path); + masterServers.add(masterServer); + } + return masterServers; + } + /** * get master server list map. * result : {host : resource info} * @return */ - public Map getServerList(boolean isMaster ){ + public Map getServerList(ZKNodeType zkNodeType){ Map masterMap = new HashMap<>(); try { - String path = isMaster ? getMasterZNodeParentPath() : getWorkerZNodeParentPath(); + String path = getZNodeParentPath(zkNodeType); List serverList = getZkClient().getChildren().forPath(path); for(String server : serverList){ byte[] bytes = getZkClient().getData().forPath(path + "/" + server); @@ -363,6 +387,28 @@ public abstract class AbstractZKClient { return masterMap; } + /** + * check the zookeeper node already exists + * @param host + * @param zkNodeType + * @return + * @throws Exception + */ + public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { + String path = getZNodeParentPath(zkNodeType); + if(StringUtils.isEmpty(path)){ + logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); + return false; + } + Map serverMaps = getServerList(zkNodeType); + for(String hostKey : serverMaps.keySet()){ + if(hostKey.startsWith(host)){ + return true; + } + } + return false; + } + /** * get zkclient * @return @@ -391,6 +437,34 @@ public abstract class AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_MASTERS); } + /** + * get zookeeper node parent path + * @param zkNodeType + * @return + */ + public String getZNodeParentPath(ZKNodeType zkNodeType) { + String path = ""; + switch (zkNodeType){ + case MASTER: + return getMasterZNodeParentPath(); + case WORKER: + return getWorkerZNodeParentPath(); + case DEAD_SERVER: + return getDeadZNodeParentPath(); + default: + break; + } + return path; + } + + /** + * get dead server node parent path + * @return + */ + protected String getDeadZNodeParentPath(){ + return conf.getString(ZOOKEEPER_ESCHEDULER_DEAD_SERVERS); + } + /** * get master start up lock path * @return @@ -415,6 +489,26 @@ public abstract class AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS); } + /** + * release mutex + * @param mutex + */ + public static void releaseMutex(InterProcessMutex mutex) { + if (mutex != null){ + try { + mutex.release(); + } catch (Exception e) { + if(e.getMessage().equals("instance must be started before calling this method")){ + logger.warn("lock release"); + }else{ + logger.error("lock release failed : " + e.getMessage(),e); + } + + } + } + } + + @Override public String toString() { diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java index 82c71e2855..36823d8b25 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ServerDao.java @@ -18,7 +18,7 @@ package cn.escheduler.dao; import cn.escheduler.dao.mapper.MasterServerMapper; import cn.escheduler.dao.mapper.WorkerServerMapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.WorkerServer; import org.springframework.beans.factory.annotation.Autowired; diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java index 4fec8092de..8fbfb5298e 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/MasterServerMapper.java @@ -16,7 +16,7 @@ */ package cn.escheduler.dao.mapper; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.apache.ibatis.annotations.*; import org.apache.ibatis.type.JdbcType; diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java index 9f66069eeb..f74683d149 100644 --- a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/MasterServerMapperTest.java @@ -17,7 +17,7 @@ package cn.escheduler.dao.mapper; import cn.escheduler.dao.datasource.ConnectionFactory; -import cn.escheduler.dao.model.MasterServer; +import cn.escheduler.common.model.MasterServer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java index b845e19ae0..8562337554 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterSchedulerThread.java @@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.server.zk.ZKMasterClient; @@ -98,18 +99,7 @@ public class MasterSchedulerThread implements Runnable { }catch (Exception e){ logger.error("master scheduler thread exception : " + e.getMessage(),e); }finally{ - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed : " + e.getMessage(),e); - } - - } - } + AbstractZKClient.releaseMutex(mutex); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 5495096161..8f23895547 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -22,6 +22,7 @@ import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadUtils; import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.OSUtils; +import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.*; import cn.escheduler.server.zk.ZKWorkerClient; @@ -235,17 +236,7 @@ public class FetchTaskThread implements Runnable{ }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); }finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("fetch task lock release"); - }else{ - logger.error("fetch task lock release failed : " + e.getMessage(),e); - } - } - } + AbstractZKClient.releaseMutex(mutex); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 3596155dd3..50ff76a024 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -19,8 +19,8 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ZKNodeType; +import cn.escheduler.common.model.MasterServer; import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.AlertDao; @@ -30,7 +30,7 @@ import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.WorkerServer; -import cn.escheduler.server.ResInfo; +import cn.escheduler.common.utils.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -118,7 +118,6 @@ public class ZKMasterClient extends AbstractZKClient { try { // create distributed lock with the root node path of the lock space as /escheduler/lock/failover/master String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(zkClient, znodeLock); mutex.acquire(); @@ -137,29 +136,19 @@ public class ZKMasterClient extends AbstractZKClient { // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { failoverWorker(null, true); -// processDao.masterStartupFaultTolerant(); failoverMaster(null); } }catch (Exception e){ logger.error("master start up exception : " + e.getMessage(),e); }finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed : " + e.getMessage(),e); - } - - } - } + releaseMutex(mutex); } } + + /** * init dao */ @@ -202,75 +191,25 @@ public class ZKMasterClient extends AbstractZKClient { // exit system System.exit(-1); } - - // specify the format of stored data in ZK nodes - String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); - // create temporary sequence nodes for master znode - masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( - masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); - + createMasterZNode(now); logger.info("register master node {} success" , masterZNode); // handle dead server handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP); - - // delete master server from database - serverDao.deleteMaster(OSUtils.getHost()); - - // register master znode - serverDao.registerMaster(OSUtils.getHost(), - OSUtils.getProcessID(), - masterZNode, - ResInfo.getResInfoJson(), - createTime, - createTime); - } catch (Exception e) { logger.error("register master failure : " + e.getMessage(),e); } } - - /** - * check the zookeeper node already exists - * @param host - * @param zkNodeType - * @return - * @throws Exception - */ - private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { - - String path = null; - switch (zkNodeType){ - case MASTER: - path = masterZNodeParentPath; - break; - case WORKER: - path = workerZNodeParentPath; - break; - case DEAD_SERVER: - path = deadServerZNodeParentPath; - break; - default: - break; - } - if(StringUtils.isEmpty(path)){ - logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); - return false; - } - - List serverList = null; - serverList = zkClient.getChildren().forPath(path); - if (CollectionUtils.isNotEmpty(serverList)){ - for (String masterZNode : serverList){ - if (masterZNode.startsWith(host)){ - return true; - } - } - } - return false; + private void createMasterZNode(Date now) throws Exception { + // specify the format of stored data in ZK nodes + String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); + // create temporary sequence nodes for master znode + masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( + masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); } + /** * monitor master */ @@ -291,59 +230,9 @@ public class ZKMasterClient extends AbstractZKClient { case CHILD_REMOVED: String path = event.getData().getPath(); logger.info("master node deleted : {}",event.getData().getPath()); - - InterProcessMutex mutexLock = null; - try { - // handle dead server, add to zk dead server pth - handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); - - if(masterZNode.equals(path)){ - logger.error("master server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); - break; - } - - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master - String znodeLock = zkMasterClient.getMasterFailoverLockPath(); - mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutexLock.acquire(); - - String masterHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); - } - if(StringUtils.isNotEmpty(masterHost)){ - failoverMaster(masterHost); - } - }catch (Exception e){ - logger.error("master failover failed : " + e.getMessage(),e); - }finally { - if (mutexLock != null){ - try { - mutexLock.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } - } + removeMasterNode(path); break; case CHILD_UPDATED: - if (event.getData().getPath().contains(OSUtils.getHost())){ - byte[] bytes = zkClient.getData().forPath(event.getData().getPath()); - String resInfoStr = new String(bytes); - String[] splits = resInfoStr.split(Constants.COMMA); - if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { - return; - } - // updateProcessInstance Master information in database according to host - serverDao.updateMaster(OSUtils.getHost(), - OSUtils.getProcessID(), - ResInfo.getResInfoJson(Double.parseDouble(splits[2]), - Double.parseDouble(splits[3])), - DateUtils.stringToDate(splits[5])); - - logger.debug("master zk node updated : {}",event.getData().getPath()); - } break; default: break; @@ -356,6 +245,42 @@ public class ZKMasterClient extends AbstractZKClient { } + private void removeMasterNode(String path) { + InterProcessMutex mutexLock = null; + try { + // handle dead server, add to zk dead server pth + handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); + + if(masterZNode.equals(path)){ + logger.error("master server({}) of myself dead , stopping...", path); + stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); + return; + } + + // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master + String znodeLock = zkMasterClient.getMasterFailoverLockPath(); + mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutexLock.acquire(); + + String masterHost = getHostByEventDataPath(path); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { + alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); + } + if(StringUtils.isNotEmpty(masterHost)){ + failoverMaster(masterHost); + } + }catch (Exception e){ + logger.error("master failover failed : " + e.getMessage(),e); + }finally { + if (mutexLock != null){ + try { + mutexLock.release(); + } catch (Exception e) { + logger.error("lock relase failed : " + e.getMessage(),e); + } + } + } + } /** @@ -377,40 +302,8 @@ public class ZKMasterClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - logger.info("node deleted : {}",event.getData().getPath()); - - InterProcessMutex mutex = null; - try { - - // handle dead server - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker - String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutex.acquire(); - - String workerHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); - } - - if(StringUtils.isNotEmpty(workerHost)){ - failoverWorker(workerHost, true); - } - }catch (Exception e){ - logger.error("worker failover failed : " + e.getMessage(),e); - } - finally { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } - } + removeZKNodePath(path); break; default: break; @@ -420,7 +313,34 @@ public class ZKMasterClient extends AbstractZKClient { }catch (Exception e){ logger.error("listener worker failed : " + e.getMessage(),e); } + } + + private void removeZKNodePath(String path) { + InterProcessMutex mutex = null; + try { + + // handle dead server + handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); + + // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker + String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); + mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + mutex.acquire(); + + String workerHost = getHostByEventDataPath(path); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { + alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); + } + if(StringUtils.isNotEmpty(workerHost)){ + failoverWorker(workerHost, true); + } + }catch (Exception e){ + logger.error("worker failover failed : " + e.getMessage(),e); + } + finally { + releaseMutex(mutex); + } } /** @@ -431,9 +351,6 @@ public class ZKMasterClient extends AbstractZKClient { return masterZNode; } - - - /** * task needs failover if task start before worker starts * @@ -460,15 +377,20 @@ public class ZKMasterClient extends AbstractZKClient { * @return */ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + if(StringUtils.isEmpty(taskInstance.getHost())){ + return false; + } Date workerServerStartDate = null; - List workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost()); - if(workerServers.size() > 0){ - workerServerStartDate = workerServers.get(0).getCreateTime(); + List workerServers= getServers(ZKNodeType.WORKER); + for(MasterServer server : workerServers){ + if(server.getHost().equals(taskInstance.getHost())){ + workerServerStartDate = server.getCreateTime(); + break; + } } if(workerServerStartDate != null){ return taskInstance.getStartTime().after(workerServerStartDate); - }else{ return false; } @@ -478,6 +400,7 @@ public class ZKMasterClient extends AbstractZKClient { * failover worker tasks * 1. kill yarn job if there are yarn jobs in tasks. * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null * @param workerHost */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { @@ -501,9 +424,6 @@ public class ZKMasterClient extends AbstractZKClient { taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processDao.saveTaskInstance(taskInstance); } - - //update task Instance state value is NEED_FAULT_TOLERANCE - // processDao.updateNeedFailoverTaskInstances(workerHost); logger.info("end worker[{}] failover ...", workerHost); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java index e00d72da24..2090ec1bdd 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java @@ -17,13 +17,13 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ServerDao; -import cn.escheduler.server.ResInfo; +import cn.escheduler.common.utils.ResInfo; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -130,50 +130,19 @@ public class ZKWorkerClient extends AbstractZKClient { * register worker */ private void registWorker(){ - // get current date Date now = new Date(); createTime = now ; try { - - // encapsulation worker znnode - workerZNode = workerZNodeParentPath + "/" + OSUtils.getHost() + "_"; - List workerZNodeList = zkClient.getChildren().forPath(workerZNodeParentPath); - - if (CollectionUtils.isNotEmpty(workerZNodeList)){ - boolean flag = false; - for (String workerZNode : workerZNodeList){ - if (workerZNode.startsWith(OSUtils.getHost())){ - flag = true; - break; - } - } - - if (flag){ - logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); - // exit system - System.exit(-1); - } + if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){ + logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); + System.exit(-1); } -// String heartbeatZKInfo = getOsInfo(now); -// workerZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(workerZNode, -// heartbeatZKInfo.getBytes()); - + // create worker zknode initWorkZNode(); // handle dead server handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP); - - // delete worker server from database - serverDao.deleteWorker(OSUtils.getHost()); - - // register worker znode - serverDao.registerWorker(OSUtils.getHost(), - OSUtils.getProcessID(), - workerZNode, - ResInfo.getResInfoJson(), - createTime, - createTime); } catch (Exception e) { logger.error("register worker failure : " + e.getMessage(),e); } @@ -198,7 +167,6 @@ public class ZKWorkerClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - // handle dead server, add to zk dead server path handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); @@ -211,22 +179,6 @@ public class ZKWorkerClient extends AbstractZKClient { logger.info("node deleted : {}", event.getData().getPath()); break; case CHILD_UPDATED: - if (event.getData().getPath().contains(OSUtils.getHost())){ - byte[] bytes = zkClient.getData().forPath(event.getData().getPath()); - String resInfoStr = new String(bytes); - String[] splits = resInfoStr.split(Constants.COMMA); - if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { - return; - } - - // updateProcessInstance master info in database according to host - serverDao.updateWorker(OSUtils.getHost(), - OSUtils.getProcessID(), - ResInfo.getResInfoJson(Double.parseDouble(splits[2]) - ,Double.parseDouble(splits[3])), - DateUtils.stringToDate(splits[5])); - logger.debug("node updated : {}",event.getData().getPath()); - } break; default: break; From f57961472c966d05d5adc64edd7d419f81638130 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 14:49:01 +0800 Subject: [PATCH 02/11] fix bug: close zk links . --- docs/zh_CN/SUMMARY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh_CN/SUMMARY.md b/docs/zh_CN/SUMMARY.md index 01f3acc6b4..2b153b60c5 100644 --- a/docs/zh_CN/SUMMARY.md +++ b/docs/zh_CN/SUMMARY.md @@ -29,7 +29,7 @@ * [开发环境搭建](后端开发文档.md#项目编译) * [自定义任务插件文档](任务插件开发.md#任务插件开发) -* [接口文档](http://52.82.13.76:8888/easyscheduler/doc.html?language=zh_CN&lang=cn) +* [接口文档](http://52.82.13.76:8888/escheduler/doc.html?language=zh_CN&lang=cn) * FAQ * [FAQ](EasyScheduler-FAQ.md) * 系统版本升级文档 From a9a89af5f2d26bd400ec2c6c7b8083753b824bde Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 15:02:54 +0800 Subject: [PATCH 03/11] fix bug: close zk links . --- .../api/service/MonitorService.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java index 33d69859d2..e754fd10e0 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java @@ -19,6 +19,7 @@ package cn.escheduler.api.service; import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.ZookeeperMonitor; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.dao.MonitorDBDao; import cn.escheduler.common.model.MasterServer; import cn.escheduler.dao.model.MonitorRecord; @@ -26,6 +27,7 @@ import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.ZookeeperRecord; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,9 +65,7 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); - List masterServers = zookeeperMonitor.getMasterServers(); - zookeeperMonitor.close(); + List masterServers = getServerList(true); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,13 +99,29 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); - List workerServers = zookeeperMonitor.getWorkerServers(); - zookeeperMonitor.close(); + List workerServers = getServerList(false); result.put(Constants.DATA_LIST, workerServers); putMsg(result,Status.SUCCESS); return result; } + + private List getServerList(boolean isMaster){ + List servers = new ArrayList<>(); + ZookeeperMonitor zookeeperMonitor = null; + try{ + zookeeperMonitor = new ZookeeperMonitor(); + ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; + servers = zookeeperMonitor.getServers(zkNodeType); + }catch (Exception e){ + throw e; + }finally { + if(zookeeperMonitor != null){ + zookeeperMonitor.close(); + } + } + return servers; + } + } From 21a8479669ae18de8333e379c6c46bf192debbce Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 15:09:40 +0800 Subject: [PATCH 04/11] delete unused code --- .../java/cn/escheduler/api/controller/MonitorController.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java index cba39d5403..8d48129f51 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/MonitorController.java @@ -23,8 +23,6 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.dao.model.User; import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 3191eb92ff5f08ee0be0be83fe73f6a669b99252 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 25 Jul 2019 18:01:31 +0800 Subject: [PATCH 05/11] update zkclient --- .../common/zk/AbstractZKClient.java | 67 ++++++++----------- .../escheduler/server/zk/ZKMasterClient.java | 6 +- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 79f26c0e82..5d3586a301 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -211,49 +211,11 @@ public abstract class AbstractZKClient { return false; } - /** - * init system znode - */ - protected void initSystemZNode(){ - try { - // read master node parent path from conf - masterZNodeParentPath = getMasterZNodeParentPath(); - // read worker node parent path from conf - workerZNodeParentPath = getWorkerZNodeParentPath(); - - // read server node parent path from conf - deadServerZNodeParentPath = getDeadZNodeParentPath(); - - if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){ - // create persistent dead server parent node - zkClient.create().creatingParentContainersIfNeeded() - .withMode(CreateMode.PERSISTENT).forPath(deadServerZNodeParentPath); - } - - if(zkClient.checkExists().forPath(masterZNodeParentPath) == null){ - // create persistent master parent node - zkClient.create().creatingParentContainersIfNeeded() - .withMode(CreateMode.PERSISTENT).forPath(masterZNodeParentPath); - } - - if(zkClient.checkExists().forPath(workerZNodeParentPath) == null){ - // create persistent worker parent node - zkClient.create().creatingParentContainersIfNeeded() - .withMode(CreateMode.PERSISTENT).forPath(workerZNodeParentPath); - } - - } catch (Exception e) { - logger.error("init system znode failed : " + e.getMessage(),e); - } - } - - public void removeDeadServerByHost(String host, String serverType) throws Exception { List deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath); for(String serverPath : deadServers){ if(serverPath.startsWith(serverType+UNDERLINE+host)){ - String server = deadServerZNodeParentPath + SINGLE_SLASH + serverPath; zkClient.delete().forPath(server); logger.info("{} server {} deleted from zk dead server path success" , serverType , host); @@ -394,10 +356,11 @@ public abstract class AbstractZKClient { * @return * @throws Exception */ - public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { + public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { String path = getZNodeParentPath(zkNodeType); if(StringUtils.isEmpty(path)){ - logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); + logger.error("check zk node exists error, host:{}, zk node type:{}", + host, zkNodeType.toString()); return false; } Map serverMaps = getServerList(zkNodeType); @@ -508,7 +471,31 @@ public abstract class AbstractZKClient { } } + /** + * init system znode + */ + protected void initSystemZNode(){ + try { + createNodePath(getMasterZNodeParentPath()); + createNodePath(getWorkerZNodeParentPath()); + createNodePath(getDeadZNodeParentPath()); + + } catch (Exception e) { + logger.error("init system znode failed : " + e.getMessage(),e); + } + } + /** + * create zookeeper node path if not exists + * @param zNodeParentPath + * @throws Exception + */ + private void createNodePath(String zNodeParentPath) throws Exception { + if(null == zkClient.checkExists().forPath(zNodeParentPath)){ + zkClient.create().creatingParentContainersIfNeeded() + .withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath); + } + } @Override public String toString() { diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 50ff76a024..2e77bb6fc6 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -322,9 +322,9 @@ public class ZKMasterClient extends AbstractZKClient { // handle dead server handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker - String znodeLock = zkMasterClient.getWorkerFailoverLockPath(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); + // create a distributed lock + String znodeLock = getWorkerFailoverLockPath(); + mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex.acquire(); String workerHost = getHostByEventDataPath(path); From e4210d14514effc71b7e2b0737ed82781bea33f4 Mon Sep 17 00:00:00 2001 From: lenboo Date: Mon, 29 Jul 2019 18:52:10 +0800 Subject: [PATCH 06/11] refactor zk client. --- .../api/service/MonitorService.java | 8 +- .../api/utils/ZookeeperMonitor.java | 4 +- .../common/zk/AbstractZKClient.java | 94 ++++++++- .../server/master/MasterServer.java | 19 +- .../escheduler/server/zk/ZKMasterClient.java | 199 +++++++----------- .../escheduler/server/zk/ZKWorkerClient.java | 29 +-- 6 files changed, 177 insertions(+), 176 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java index e754fd10e0..cc6a847578 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/MonitorService.java @@ -65,7 +65,7 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - List masterServers = getServerList(true); + List masterServers = getServerListFromZK(true); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,7 +99,7 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - List workerServers = getServerList(false); + List workerServers = getServerListFromZK(false); result.put(Constants.DATA_LIST, workerServers); putMsg(result,Status.SUCCESS); @@ -107,13 +107,13 @@ public class MonitorService extends BaseService{ return result; } - private List getServerList(boolean isMaster){ + private List getServerListFromZK(boolean isMaster){ List servers = new ArrayList<>(); ZookeeperMonitor zookeeperMonitor = null; try{ zookeeperMonitor = new ZookeeperMonitor(); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; - servers = zookeeperMonitor.getServers(zkNodeType); + servers = zookeeperMonitor.getServersList(zkNodeType); }catch (Exception e){ throw e; }finally { diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java index ddbb6c9c1a..dc6f95d7ec 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/ZookeeperMonitor.java @@ -40,7 +40,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getMasterServers(){ - return getServers(ZKNodeType.MASTER); + return getServersList(ZKNodeType.MASTER); } /** @@ -48,7 +48,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * @return */ public List getWorkerServers(){ - return getServers(ZKNodeType.WORKER); + return getServersList(ZKNodeType.WORKER); } private static List zookeeperInfoList(String zookeeperServers) { diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 5d3586a301..46cb7a980e 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -20,7 +20,6 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.model.MasterServer; -import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.ResInfo; @@ -223,28 +222,68 @@ public abstract class AbstractZKClient { } } + + /** + * create zookeeper path according the zk node type. + * @param zkNodeType + * @return + * @throws Exception + */ + private String createZNodePath(ZKNodeType zkNodeType) throws Exception { + // specify the format of stored data in ZK nodes + String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date()); + // create temporary sequence nodes for master znode + String parentPath = getZNodeParentPath(zkNodeType); + String serverPathPrefix = parentPath + "/" + OSUtils.getHost(); + String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( + serverPathPrefix + "_", heartbeatZKInfo.getBytes()); + logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); + return registerPath; + } + + /** + * register server, if server already exists, return null. + * @param zkNodeType + * @return register server path in zookeeper + */ + public String registerServer(ZKNodeType zkNodeType) throws Exception { + String registerPath = null; + String host = OSUtils.getHost(); + if(checkZKNodeExists(host, zkNodeType)){ + logger.error("register failure , {} server already started on host : {}" , + zkNodeType.toString(), host); + return registerPath; + } + registerPath = createZNodePath(ZKNodeType.MASTER); + + // handle dead server + handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); + + return registerPath; + } + /** * opType(add): if find dead server , then add to zk deadServerPath * opType(delete): delete path from zk * * @param zNode node path - * @param serverType master or worker prefix + * @param zkNodeType master or worker * @param opType delete or add * @throws Exception */ - public void handleDeadServer(String zNode, String serverType, String opType) throws Exception { + public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { //ip_sequenceno String[] zNodesPath = zNode.split("\\/"); String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; //check server restart, if restart , dead server path in zk should be delete if(opType.equals(DELETE_ZK_OP)){ String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); String ip = ipAndSeqNo[0]; - removeDeadServerByHost(ip, serverType); + removeDeadServerByHost(ip, type); }else if(opType.equals(ADD_ZK_OP)){ String deadServerPath = deadServerZNodeParentPath + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; @@ -253,7 +292,8 @@ public abstract class AbstractZKClient { zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes()); - logger.info("{} server dead , and {} added to zk dead server path success" , serverType, zNode); + logger.info("{} server dead , and {} added to zk dead server path success" , + zkNodeType.toString(), zNode); } } @@ -314,8 +354,8 @@ public abstract class AbstractZKClient { * @param zkNodeType * @return */ - public List getServers(ZKNodeType zkNodeType){ - Map masterMap = getServerList(zkNodeType); + public List getServersList(ZKNodeType zkNodeType){ + Map masterMap = getServerMaps(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType); List masterServers = new ArrayList<>(); @@ -332,7 +372,7 @@ public abstract class AbstractZKClient { * result : {host : resource info} * @return */ - public Map getServerList(ZKNodeType zkNodeType){ + public Map getServerMaps(ZKNodeType zkNodeType){ Map masterMap = new HashMap<>(); try { @@ -363,7 +403,7 @@ public abstract class AbstractZKClient { host, zkNodeType.toString()); return false; } - Map serverMaps = getServerList(zkNodeType); + Map serverMaps = getServerMaps(zkNodeType); for(String hostKey : serverMaps.keySet()){ if(hostKey.startsWith(host)){ return true; @@ -497,7 +537,39 @@ public abstract class AbstractZKClient { } } - @Override + /** + * server self dead, stop all threads + * @param serverHost + * @param zkNodeType + */ + protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) { + if (serverHost.equals(OSUtils.getHost())) { + logger.error("{} server({}) of myself dead , stopping...", + zkNodeType.toString(), serverHost); + stoppable.stop(String.format(" {} server {} of myself dead , stopping...", + zkNodeType.toString(), serverHost)); + return true; + } + return false; + } + + /** + * get host ip, string format: masterParentPath/ip_000001/value + * @param path + * @return + */ + protected String getHostByEventDataPath(String path) { + int startIndex = path.lastIndexOf("/")+1; + int endIndex = path.lastIndexOf("_"); + + if(startIndex >= endIndex){ + logger.error("parse ip error"); + return ""; + } + return path.substring(startIndex, endIndex); + } + + @Override public String toString() { return "AbstractZKClient{" + "zkClient=" + zkClient + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index bf0dcbfe75..562b6509e5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -119,8 +119,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { public MasterServer(ProcessDao processDao){ zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); - this.serverDao = zkMasterClient.getServerDao(); - this.alertDao = zkMasterClient.getAlertDao(); } public void run(ProcessDao processDao){ @@ -128,6 +126,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { heartBeatInterval = conf.getInt(Constants.MASTER_HEARTBEAT_INTERVAL, Constants.defaultMasterHeartbeatInterval); + // master exec thread pool num + int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS, + Constants.defaultMasterExecThreadNum); + + heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); // heartbeat thread implement @@ -140,10 +143,6 @@ public class MasterServer implements CommandLineRunner, IStoppable { heartbeatMasterService. scheduleAtFixedRate(heartBeatThread, 5, heartBeatInterval, TimeUnit.SECONDS); - // master exec thread pool num - int masterExecThreadNum = conf.getInt(Constants.MASTER_EXEC_THREADS, - Constants.defaultMasterExecThreadNum); - // master scheduler thread MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( zkMasterClient, @@ -154,6 +153,8 @@ public class MasterServer implements CommandLineRunner, IStoppable { masterSchedulerService.execute(masterSchedulerThread); // start QuartzExecutors + // TODO... + // what system should do if exception try { ProcessScheduleJob.init(processDao); QuartzExecutors.getInstance().start(); @@ -173,13 +174,11 @@ public class MasterServer implements CommandLineRunner, IStoppable { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - String host = OSUtils.getHost(); - // clear master table register info - serverDao.deleteMaster(host); logger.info("master server stopped"); if (zkMasterClient.getActiveMasterNum() <= 1) { for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, host, "Master-Server"); + zkMasterClient.getAlertDao().sendServerStopedAlert( + 1, OSUtils.getHost(), "Master-Server"); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 2e77bb6fc6..c972580e31 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -20,8 +20,6 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.model.MasterServer; -import cn.escheduler.common.utils.CollectionUtils; -import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; @@ -29,8 +27,6 @@ import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; -import cn.escheduler.dao.model.WorkerServer; -import cn.escheduler.common.utils.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -39,7 +35,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.utils.ThreadUtils; -import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +63,7 @@ public class ZKMasterClient extends AbstractZKClient { * master database access */ private ServerDao serverDao = null; + /** * alert database access */ @@ -77,9 +73,6 @@ public class ZKMasterClient extends AbstractZKClient { */ private ProcessDao processDao; - - private Date createTime = null; - /** * zkMasterClient */ @@ -131,7 +124,7 @@ public class ZKMasterClient extends AbstractZKClient { this.listenerWorker(); // register master - this.registMaster(); + this.registerMaster(); // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { @@ -157,15 +150,6 @@ public class ZKMasterClient extends AbstractZKClient { this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } - - /** - * get maste dao - * @return - */ - public ServerDao getServerDao(){ - return serverDao; - } - /** * get alert dao * @return @@ -174,40 +158,24 @@ public class ZKMasterClient extends AbstractZKClient { return alertDao; } + + + /** * register master znode */ - public void registMaster(){ - - // get current date - Date now = new Date(); - createTime = now ; + public void registerMaster(){ try { - String osHost = OSUtils.getHost(); - - // zookeeper node exists, cannot start a new one. - if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){ - logger.error("register failure , master already started on host : {}" , osHost); - // exit system - System.exit(-1); + String serverPath = registerServer(ZKNodeType.MASTER); + if(StringUtils.isEmpty(serverPath)){ + System.exit(-1); } - createMasterZNode(now); - logger.info("register master node {} success" , masterZNode); - - // handle dead server - handleDeadServer(masterZNode, Constants.MASTER_PREFIX, Constants.DELETE_ZK_OP); } catch (Exception e) { logger.error("register master failure : " + e.getMessage(),e); + System.exit(-1); } } - private void createMasterZNode(Date now) throws Exception { - // specify the format of stored data in ZK nodes - String heartbeatZKInfo = ResInfo.getHeartBeatInfo(now); - // create temporary sequence nodes for master znode - masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( - masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); - } /** @@ -217,8 +185,6 @@ public class ZKMasterClient extends AbstractZKClient { PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterZNodeParentPath, true ,defaultThreadFactory); try { - Date now = new Date(); - createTime = now ; masterPc.start(); masterPc.getListenable().addListener(new PathChildrenCacheListener() { @Override @@ -229,8 +195,11 @@ public class ZKMasterClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - logger.info("master node deleted : {}",event.getData().getPath()); - removeMasterNode(path); + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){ + return; + } + removeZKNodePath(path, ZKNodeType.MASTER, true); break; case CHILD_UPDATED: break; @@ -242,46 +211,69 @@ public class ZKMasterClient extends AbstractZKClient { }catch (Exception e){ logger.error("monitor master failed : " + e.getMessage(),e); } +} - } - - private void removeMasterNode(String path) { - InterProcessMutex mutexLock = null; + private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { + logger.info("{} node deleted : {}", zkNodeType.toString(), path); + InterProcessMutex mutex = null; try { - // handle dead server, add to zk dead server pth - handleDeadServer(path, Constants.MASTER_PREFIX, Constants.ADD_ZK_OP); + String failoverPath = getFailoverLockPath(zkNodeType); + // create a distributed lock + mutex = new InterProcessMutex(getZkClient(), failoverPath); + mutex.acquire(); - if(masterZNode.equals(path)){ - logger.error("master server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("master server(%s) of myself dead , stopping...", path)); - return; + String serverHost = getHostByEventDataPath(path); + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + //alert server down. + alertServerDown(serverHost, zkNodeType); + //failover server + if(failover){ + failoverServerWhenDown(serverHost, zkNodeType); } + }catch (Exception e){ + logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("failover exception : " + e.getMessage(),e); + } + finally { + releaseMutex(mutex); + } + } - // create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/master - String znodeLock = zkMasterClient.getMasterFailoverLockPath(); - mutexLock = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutexLock.acquire(); + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + if(StringUtils.isEmpty(serverHost)){ + return ; + } + switch (zkNodeType){ + case MASTER: + failoverMaster(serverHost); + break; + case WORKER: + failoverWorker(serverHost, true); + default: + break; + } + } - String masterHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); - } - if(StringUtils.isNotEmpty(masterHost)){ - failoverMaster(masterHost); - } - }catch (Exception e){ - logger.error("master failover failed : " + e.getMessage(),e); - }finally { - if (mutexLock != null){ - try { - mutexLock.release(); - } catch (Exception e) { - logger.error("lock relase failed : " + e.getMessage(),e); - } - } + private String getFailoverLockPath(ZKNodeType zkNodeType){ + + switch (zkNodeType){ + case MASTER: + return getMasterFailoverLockPath(); + case WORKER: + return getWorkerFailoverLockPath(); + default: + return ""; } } + private void alertServerDown(String serverHost, ZKNodeType zkNodeType) { + + String serverType = zkNodeType.toString(); + for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER; i++) { + alertDao.sendServerStopedAlert(1, serverHost, serverType); + } + } /** * monitor worker @@ -290,8 +282,6 @@ public class ZKMasterClient extends AbstractZKClient { PathChildrenCache workerPc = new PathChildrenCache(zkClient,workerZNodeParentPath,true ,defaultThreadFactory); try { - Date now = new Date(); - createTime = now ; workerPc.start(); workerPc.getListenable().addListener(new PathChildrenCacheListener() { @Override @@ -303,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient { case CHILD_REMOVED: String path = event.getData().getPath(); logger.info("node deleted : {}",event.getData().getPath()); - removeZKNodePath(path); + removeZKNodePath(path, ZKNodeType.WORKER, true); break; default: break; @@ -315,33 +305,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - private void removeZKNodePath(String path) { - InterProcessMutex mutex = null; - try { - - // handle dead server - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - - // create a distributed lock - String znodeLock = getWorkerFailoverLockPath(); - mutex = new InterProcessMutex(getZkClient(), znodeLock); - mutex.acquire(); - - String workerHost = getHostByEventDataPath(path); - for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { - alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); - } - - if(StringUtils.isNotEmpty(workerHost)){ - failoverWorker(workerHost, true); - } - }catch (Exception e){ - logger.error("worker failover failed : " + e.getMessage(),e); - } - finally { - releaseMutex(mutex); - } - } /** * get master znode @@ -381,7 +344,7 @@ public class ZKMasterClient extends AbstractZKClient { return false; } Date workerServerStartDate = null; - List workerServers= getServers(ZKNodeType.WORKER); + List workerServers= getServersList(ZKNodeType.WORKER); for(MasterServer server : workerServers){ if(server.getHost().equals(taskInstance.getHost())){ workerServerStartDate = server.getCreateTime(); @@ -444,24 +407,4 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("master failover end"); } - /** - * get host ip, string format: masterParentPath/ip_000001/value - * @param path - * @return - */ - private String getHostByEventDataPath(String path) { - int startIndex = path.lastIndexOf("/")+1; - int endIndex = path.lastIndexOf("_"); - - if(startIndex >= endIndex){ - logger.error("parse ip error"); - return ""; - } - return path.substring(startIndex, endIndex); - } - - - - - } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java index 2090ec1bdd..0c35728f84 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKWorkerClient.java @@ -18,12 +18,12 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ZKNodeType; -import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ServerDao; import cn.escheduler.common.utils.ResInfo; +import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; -import java.util.List; import java.util.concurrent.ThreadFactory; @@ -130,21 +129,14 @@ public class ZKWorkerClient extends AbstractZKClient { * register worker */ private void registWorker(){ - // get current date - Date now = new Date(); - createTime = now ; try { - if(checkZKNodeExists(OSUtils.getHost(), ZKNodeType.WORKER)){ - logger.info("register failure , worker already started on : {}, please wait for a moment and try again" , OSUtils.getHost()); + String serverPath = registerServer(ZKNodeType.WORKER); + if(StringUtils.isEmpty(serverPath)){ System.exit(-1); } - - // create worker zknode - initWorkZNode(); - // handle dead server - handleDeadServer(workerZNode, Constants.WORKER_PREFIX, Constants.DELETE_ZK_OP); } catch (Exception e) { logger.error("register worker failure : " + e.getMessage(),e); + System.exit(-1); } } @@ -167,16 +159,11 @@ public class ZKWorkerClient extends AbstractZKClient { break; case CHILD_REMOVED: String path = event.getData().getPath(); - // handle dead server, add to zk dead server path - handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP); - //find myself dead - if(workerZNode.equals(path)){ - - logger.warn(" worker server({}) of myself dead , stopping...", path); - stoppable.stop(String.format("worker server(%s) of myself dead , stopping",path)); - } - logger.info("node deleted : {}", event.getData().getPath()); + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ + return; + } break; case CHILD_UPDATED: break; From 651ba5f7651df47d0c497767f237a73bfb4cf803 Mon Sep 17 00:00:00 2001 From: lenboo Date: Wed, 31 Jul 2019 20:47:03 +0800 Subject: [PATCH 07/11] update readme --- README.md | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 182ece4c38..29d22a7619 100644 --- a/README.md +++ b/README.md @@ -27,27 +27,14 @@ Its main objectives are as follows: - There are more waiting partners to explore -### Comparison with similar scheduler systems - - -  | EasyScheduler | Azkaban | Airflow --- | -- | -- | -- -**Stability** |   |   |   -Single point of failure | Decentralized multi-master and multi-worker | Yes
Single Web and Scheduler Combination Node | Yes
Single Scheduler -Additional HA requirements | Not required (HA is supported by itself) | DB | Celery / Dask / Mesos + Load Balancer + DB -Overload processing | Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | Jammed the server when there are too many tasks | Jammed the server when there are too many tasks -**Easy to use** |   |   |   -DAG Monitoring Interface | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance. | Only task status can be seen | Can't visually distinguish task types -Visual process definition | Yes
All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | No
DAG and custom upload via custom DSL | No
DAG is drawn through Python code, which is inconvenient to use, especially for business people who can't write code. -Quick deployment | One-click deployment | Complex clustering deployment | Complex clustering deployment -**Features** |   |   |   -Suspend and resume | Support pause, recover operation | No
Can only kill the workflow first and then re-run | No
Can only kill the workflow first and then re-run -Whether to support multiple tenants | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | No | No -Task type | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | shell、gobblin、hadoopJava、java、hive、pig、spark、hdfsToTeradata、teradataToHdfs | BashOperator、DummyOperator、MySqlOperator、HiveOperator、EmailOperator、HTTPOperator、SqlOperator -Compatibility | Support the scheduling of big data jobs like spark, hive, Mr. At the same time, it is more compatible with big data business because it supports multiple tenants. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform. | Because it does not support multi-tenant, it is not flexible enough to use business in big data platform. -**Scalability** |   |   |   -Whether to support custom task types | Yes | Yes | Yes -Is Cluster Extension Supported? | Yes
The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. | Yes
but complicated Executor horizontal extend | Yes
but complicated Executor horizontal extend +### what's in the scheduler systems + + +  | Stability | Easy to use | Features | Scalability | +-- | -- | -- | -- | -- +Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance.  |  Support pause, recover operation | support custom task types +HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. + Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | | From caec934939a3e4a5a3021cb6d24228aa88e46edf Mon Sep 17 00:00:00 2001 From: lenboo Date: Wed, 31 Jul 2019 20:48:18 +0800 Subject: [PATCH 08/11] update readme --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 29d22a7619..66b0242df0 100644 --- a/README.md +++ b/README.md @@ -30,11 +30,11 @@ Its main objectives are as follows: ### what's in the scheduler systems -  | Stability | Easy to use | Features | Scalability | --- | -- | -- | -- | -- + Stability | Easy to use | Features | Scalability | + -- | -- | -- | -- Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance.  |  Support pause, recover operation | support custom task types HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on easyscheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " Supports traditional shell tasks, while supporting large data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. - Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | | +Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | | From b2f5fde581618859a8009f8ed04ca719ae2fe30d Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 1 Aug 2019 10:42:54 +0800 Subject: [PATCH 09/11] update readme --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 66b0242df0..f1cafddec4 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,7 @@ Its main objectives are as follows: - There are more waiting partners to explore -### what's in the scheduler systems - +### What's in Easy Scheduler Stability | Easy to use | Features | Scalability | -- | -- | -- | -- From 67b3b5ad0f7bffda928e7e02373a0afb8aae15c5 Mon Sep 17 00:00:00 2001 From: lenboo Date: Mon, 5 Aug 2019 09:54:59 +0800 Subject: [PATCH 10/11] update 1.1.0 release note. --- docs/zh_CN/1.1.0-release.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/zh_CN/1.1.0-release.md b/docs/zh_CN/1.1.0-release.md index c3bf898b95..6c5e9ffdc9 100644 --- a/docs/zh_CN/1.1.0-release.md +++ b/docs/zh_CN/1.1.0-release.md @@ -29,6 +29,9 @@ Easy Scheduler 1.1.0是1.1.x系列中的第一个版本。 - [[EasyScheduler-389](https://github.com/analysys/EasyScheduler/issues/389)]service monitor cannot find the change of master/worker - [[EasyScheduler-511](https://github.com/analysys/EasyScheduler/issues/511)]support recovery process from stop/kill nodes. - [[EasyScheduler-399](https://github.com/analysys/EasyScheduler/issues/399)]HadoopUtils指定用户操作,而不是 **部署用户 +- [[EasyScheduler-378](https://github.com/analysys/EasyScheduler/issues/378)]Mailbox regular match +- [[EasyScheduler-625](https://github.com/analysys/EasyScheduler/issues/625)]EasyScheduler call shell "task instance not set host" +- [[EasyScheduler-622](https://github.com/analysys/EasyScheduler/issues/622)]Front-end interface deployment k8s, background deployment big data cluster session error 修复: === @@ -41,6 +44,9 @@ Easy Scheduler 1.1.0是1.1.x系列中的第一个版本。 - [[EasyScheduler-543](https://github.com/analysys/EasyScheduler/issues/543)]optimize datasource connection params safety - [[EasyScheduler-569](https://github.com/analysys/EasyScheduler/issues/569)]定时任务无法真正停止 - [[EasyScheduler-463](https://github.com/analysys/EasyScheduler/issues/463)]邮箱验证不支持非常见后缀邮箱 +- [[EasyScheduler-650](https://github.com/analysys/EasyScheduler/issues/650)]Creating a hive data source without a principal will cause the connection to fail +- [[EasyScheduler-641](https://github.com/analysys/EasyScheduler/issues/641)]The cellphone is not supported for 199 telecom segment when create a user +- [[EasyScheduler-627](https://github.com/analysys/EasyScheduler/issues/627)]Different sql node task logs in parallel in the same workflow will be mixed @@ -49,7 +55,7 @@ Easy Scheduler 1.1.0是1.1.x系列中的第一个版本。 === 最后但最重要的是,没有以下伙伴的贡献就没有新版本的诞生: -Baoqi, jimmy201602, samz406, petersear, millionfor, hyperknob, fanguanqun, yangqinlong, qq389401879, chgxtony, Stanfan, lfyee, thisnew, hujiang75277381, sunnyingit, lgbo-ustc, ivivi, lzy305, JackIllkid, telltime, lipengbo2018, wuchunfu, telltime +Baoqi, jimmy201602, samz406, petersear, millionfor, hyperknob, fanguanqun, yangqinlong, qq389401879, chgxtony, Stanfan, lfyee, thisnew, hujiang75277381, sunnyingit, lgbo-ustc, ivivi, lzy305, JackIllkid, telltime, lipengbo2018, wuchunfu, telltime, chenyuan9028, zhangzhipeng621, thisnew, 307526982, crazycarry 以及微信群里众多的热心伙伴!在此非常感谢! From 4116f4def44d057acd621eca3b2db25561e3a615 Mon Sep 17 00:00:00 2001 From: lenboo Date: Mon, 5 Aug 2019 10:27:31 +0800 Subject: [PATCH 11/11] update 1.1.0 release note --- docs/zh_CN/1.1.0-release.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/zh_CN/1.1.0-release.md b/docs/zh_CN/1.1.0-release.md index 6c5e9ffdc9..b603180708 100644 --- a/docs/zh_CN/1.1.0-release.md +++ b/docs/zh_CN/1.1.0-release.md @@ -47,6 +47,8 @@ Easy Scheduler 1.1.0是1.1.x系列中的第一个版本。 - [[EasyScheduler-650](https://github.com/analysys/EasyScheduler/issues/650)]Creating a hive data source without a principal will cause the connection to fail - [[EasyScheduler-641](https://github.com/analysys/EasyScheduler/issues/641)]The cellphone is not supported for 199 telecom segment when create a user - [[EasyScheduler-627](https://github.com/analysys/EasyScheduler/issues/627)]Different sql node task logs in parallel in the same workflow will be mixed +- [[EasyScheduler-655](https://github.com/analysys/EasyScheduler/issues/655)]when deploy a spark task,the tentant queue not empty,set with a empty queue name +- [[EasyScheduler-667](https://github.com/analysys/EasyScheduler/issues/667)]HivePreparedStatement can't print the actual SQL executed