diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 9ad359765f..212e5d9a88 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -29,11 +26,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.quartz.SchedulerException; import org.slf4j.Logger; @@ -44,7 +38,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; -import java.util.concurrent.ExecutorService; /** * master server @@ -57,23 +50,6 @@ public class MasterServer { */ private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); - /** - * zk master client - */ - @Autowired - private ZKMasterClient zkMasterClient = null; - - /** - * process service - */ - @Autowired - protected ProcessService processService; - - /** - * master exec thread pool - */ - private ExecutorService masterSchedulerService; - /** * master config */ @@ -98,6 +74,12 @@ public class MasterServer { @Autowired private MasterRegistry masterRegistry; + /** + * zk master client + */ + @Autowired + private ZKMasterClient zkMasterClient; + /** * master server startup * @@ -125,27 +107,13 @@ public class MasterServer { this.nettyRemotingServer.start(); // + this.zkMasterClient.start(); this.masterRegistry.registry(); - // - zkMasterClient.init(); - - masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - - // master scheduler thread - MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( - zkMasterClient, - processService, - masterConfig.getMasterExecThreads()); - - // submit master scheduler thread - masterSchedulerService.execute(masterSchedulerThread); - // start QuartzExecutors // what system should do if exception try { logger.info("start Quartz server..."); - ProcessScheduleJob.init(processService); QuartzExecutors.getInstance().start(); } catch (Exception e) { try { @@ -162,19 +130,15 @@ public class MasterServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - if (zkMasterClient.getActiveMasterNum() <= 1) { - zkMasterClient.getAlertDao().sendServerStopedAlert( - 1, OSUtils.getHost(), "Master-Server"); - } - close("shutdownhook"); + close("shutdownHook"); } })); } /** - * gracefully stop - * @param cause why stopping + * gracefully close + * @param cause */ public void close(String cause) { @@ -197,40 +161,15 @@ public class MasterServer { } this.nettyRemotingServer.close(); this.masterRegistry.unRegistry(); + this.zkMasterClient.close(); //close quartz try{ QuartzExecutors.getInstance().shutdown(); + logger.info("Quartz service stopped"); }catch (Exception e){ logger.warn("Quartz service stopped exception:{}",e.getMessage()); } - - logger.info("Quartz service stopped"); - - try { - ThreadPoolExecutors.getInstance().shutdown(); - }catch (Exception e){ - logger.warn("threadPool service stopped exception:{}",e.getMessage()); - } - - logger.info("threadPool service stopped"); - - try { - masterSchedulerService.shutdownNow(); - }catch (Exception e){ - logger.warn("master scheduler service stopped exception:{}",e.getMessage()); - } - - logger.info("master scheduler service stopped"); - - try { - zkMasterClient.close(); - }catch (Exception e){ - logger.warn("zookeeper service stopped exception:{}",e.getMessage()); - } - - logger.info("zookeeper service stopped"); - } catch (Exception e) { logger.error("master server stop exception ", e); System.exit(-1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java similarity index 66% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6e96164e65..a5598ee8c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -28,49 +28,43 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; -import java.util.concurrent.ExecutorService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.ThreadPoolExecutor; /** * master scheduler thread */ -public class MasterSchedulerThread implements Runnable { +@Service +public class MasterSchedulerService extends Thread { /** * logger of MasterSchedulerThread */ - private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class); + private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); /** * master exec service */ - private final ExecutorService masterExecService; + private ThreadPoolExecutor masterExecService; /** * dolphinscheduler database interface */ - private final ProcessService processService; + @Autowired + private ProcessService processService; /** * zookeeper master client */ - private final ZKMasterClient zkMasterClient ; - - /** - * master exec thread num - */ - private int masterExecThreadNum; - - /** - * master config - */ - private MasterConfig masterConfig; + @Autowired + private ZKMasterClient zkMasterClient; /** * netty remoting client @@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable { private NettyRemotingClient nettyRemotingClient; + @Autowired + private MasterConfig masterConfig; + /** * constructor of MasterSchedulerThread - * @param zkClient zookeeper master client - * @param processService process service - * @param masterExecThreadNum master exec thread num */ - public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService processService, int masterExecThreadNum){ - this.processService = processService; - this.zkMasterClient = zkClient; - this.masterExecThreadNum = masterExecThreadNum; - this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum); - this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - // + @PostConstruct + public void init(){ + this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + super.setName("MasterSchedulerThread"); + super.start(); + } + + @PreDestroy + public void close(){ + nettyRemotingClient.close(); + logger.info("master schedule service stopped..."); } /** @@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable { */ @Override public void run() { - logger.info("master scheduler start successfully..."); + logger.info("master scheduler started"); while (Stopper.isRunning()){ - - // process instance - ProcessInstance processInstance = null; - InterProcessMutex mutex = null; try { - boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if(!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); @@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable { } if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters - String znodeLock = zkMasterClient.getMasterLockPath(); + mutex = zkMasterClient.blockAcquireMutex(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutex.acquire(); - - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; - int activeCount = poolExecutor.getActiveCount(); + int activeCount = masterExecService.getActiveCount(); // make sure to scan and delete command table in one transaction Command command = processService.findOneCommand(); if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ - processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); + ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); @@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } - }catch (Exception e){ + } catch (Exception e){ logger.error("master scheduler thread exception",e); - }finally{ - AbstractZKClient.releaseMutex(mutex); + } finally{ + zkMasterClient.releaseMutex(mutex); } } - nettyRemotingClient.close(); - logger.info("master server stopped..."); } - - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 9a4a7caaf1..a437888f2b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,12 @@ public class ZookeeperNodeManager implements InitializingBean { @Autowired private ZookeeperRegistryCenter registryCenter; + /** + * alert dao + */ + @Autowired + private AlertDao alertDao; + /** * init listener * @throws Exception @@ -136,6 +143,7 @@ public class ZookeeperNodeManager implements InitializingBean { Set previousNodes = new HashSet<>(workerNodes); Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); + alertDao.sendServerStopedAlert(1, path, "WORKER"); } } catch (IllegalArgumentException ignore) { logger.warn(ignore.getMessage()); @@ -175,6 +183,7 @@ public class ZookeeperNodeManager implements InitializingBean { Set previousNodes = new HashSet<>(masterNodes); Set currentNodes = registryCenter.getMasterNodesDirectly(); syncMasterNodes(currentNodes); + alertDao.sendServerStopedAlert(1, path, "MASTER"); } } catch (Exception ex) { logger.error("MasterNodeListener capture data change and get data failed.", ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 441e8db892..e1872f7444 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -78,7 +77,6 @@ public class WorkerServer { * @param args arguments */ public static void main(String[] args) { - System.setProperty("spring.profiles.active","worker"); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -136,11 +134,6 @@ public class WorkerServer { this.nettyRemotingServer.close(); this.workerRegistry.unRegistry(); - try { - ThreadPoolExecutors.getInstance().shutdown(); - }catch (Exception e){ - logger.warn("threadPool service stopped exception:{}",e.getMessage()); - } } catch (Exception e) { logger.error("worker server stop exception ", e); System.exit(-1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 7fc91dc9e2..a59cf3e397 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -16,22 +16,19 @@ */ package org.apache.dolphinscheduler.server.zk; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.utils.ThreadUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.slf4j.Logger; @@ -41,7 +38,6 @@ import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; -import java.util.concurrent.ThreadFactory; /** @@ -57,46 +53,19 @@ public class ZKMasterClient extends AbstractZKClient { */ private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); - /** - * thread factory - */ - private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread"); - - /** - * master znode - */ - private String masterZNode = null; - - /** - * alert database access - */ - private AlertDao alertDao = null; /** * process service */ @Autowired private ProcessService processService; - /** - * default constructor - */ - private ZKMasterClient(){} - - /** - * init - */ - public void init(){ - - logger.info("initialize master client..."); - - // init dao - this.initDao(); + public void start() { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(zkClient, znodeLock); + mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex.acquire(); // init system znode @@ -115,20 +84,9 @@ public class ZKMasterClient extends AbstractZKClient { } } - - /** - * init dao - */ - public void initDao(){ - this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); - } - /** - * get alert dao - * - * @return AlertDao - */ - public AlertDao getAlertDao() { - return alertDao; + @Override + public void close(){ + super.close(); } /** @@ -167,8 +125,6 @@ public class ZKMasterClient extends AbstractZKClient { String serverHost = getHostByEventDataPath(path); // handle dead server handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); - //alert server down. - alertServerDown(serverHost, zkNodeType); //failover server if(failover){ failoverServerWhenDown(serverHost, zkNodeType); @@ -222,18 +178,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - /** - * send alert when server down - * - * @param serverHost server host - * @param zkNodeType zookeeper node type - */ - private void alertServerDown(String serverHost, ZKNodeType zkNodeType) { - - String serverType = zkNodeType.toString(); - alertDao.sendServerStopedAlert(1, serverHost, serverType); - } - /** * monitor master * @param event event @@ -271,16 +215,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - - /** - * get master znode - * - * @return master zookeeper node - */ - public String getMasterZNode() { - return masterZNode; - } - /** * task needs failover if task start before worker starts * @@ -399,4 +333,10 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("master failover end"); } + public InterProcessMutex blockAcquireMutex() throws Exception { + InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); + mutex.acquire(); + return mutex; + } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index 69a80e65f5..d055e2de85 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.quartz.Job; import org.quartz.JobDataMap; @@ -44,18 +45,8 @@ public class ProcessScheduleJob implements Job { */ private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); - /** - * process service - */ - private static ProcessService processService; - - - /** - * init - * @param processService process dao - */ - public static void init(ProcessService processService) { - ProcessScheduleJob.processService = processService; + public ProcessService getProcessService(){ + return SpringApplicationContext.getBean(ProcessService.class); } /** @@ -67,7 +58,7 @@ public class ProcessScheduleJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { - Assert.notNull(processService, "please call init() method first"); + Assert.notNull(getProcessService(), "please call init() method first"); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); @@ -83,7 +74,7 @@ public class ProcessScheduleJob implements Job { logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId); // query schedule - Schedule schedule = processService.querySchedule(scheduleId); + Schedule schedule = getProcessService().querySchedule(scheduleId); if (schedule == null) { logger.warn("process schedule does not exist in db,delete schedule job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId); deleteJob(projectId, scheduleId); @@ -91,7 +82,7 @@ public class ProcessScheduleJob implements Job { } - ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); + ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId()); // release state : online/offline ReleaseState releaseState = processDefinition.getReleaseState(); if (processDefinition == null || releaseState == ReleaseState.OFFLINE) { @@ -111,7 +102,7 @@ public class ProcessScheduleJob implements Job { command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); - processService.createCommand(command); + getProcessService().createCommand(command); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 24bf25984b..0b9fbe412a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -16,19 +16,15 @@ */ package org.apache.dolphinscheduler.service.zk; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.*; @@ -37,15 +33,11 @@ import static org.apache.dolphinscheduler.common.Constants.*; /** * abstract zookeeper client */ +@Component public abstract class AbstractZKClient extends ZookeeperCachedOperator { private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - /** - * server stop or not - */ - protected IStoppable stoppable = null; - /** * check dead server or not , if dead, stop self * @@ -65,8 +57,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { if(!isExisted(zNode) || isExisted(deadServerPath)){ return true; } - - return false; } @@ -99,28 +89,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return registerPath; } - /** - * register server, if server already exists, return null. - * @param zkNodeType zookeeper node type - * @return register server path in zookeeper - * @throws Exception errors - */ - public String registerServer(ZKNodeType zkNodeType) throws Exception { - String registerPath = null; - String host = OSUtils.getHost(); - if(checkZKNodeExists(host, zkNodeType)){ - logger.error("register failure , {} server already started on host : {}" , - zkNodeType.toString(), host); - return registerPath; - } - registerPath = createZNodePath(zkNodeType, host); - - // handle dead server - handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); - - return registerPath; - } - /** * opType(add): if find dead server , then add to zk deadServerPath * opType(delete): delete path from zk @@ -152,16 +120,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { } - - - /** - * for stop server - * @param serverStoppable server stoppable interface - */ - public void setStoppable(IStoppable serverStoppable){ - this.stoppable = serverStoppable; - } - /** * get active master num * @return active master number @@ -275,14 +233,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; } - /** - * - * @return get master lock path - */ - public String getWorkerLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS; - } - /** * * @param zkNodeType zookeeper node type @@ -339,7 +289,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { * release mutex * @param mutex mutex */ - public static void releaseMutex(InterProcessMutex mutex) { + public void releaseMutex(InterProcessMutex mutex) { if (mutex != null){ try { mutex.release(); @@ -387,18 +337,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return pathArray[pathArray.length - 1]; } - /** - * acquire zk lock - * @param zkClient zk client - * @param zNodeLockPath zk lock path - * @return zk lock - * @throws Exception errors - */ - public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{ - InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath); - mutex.acquire(); - return mutex; - } @Override public String toString() { @@ -407,7 +345,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + - ", stoppable=" + stoppable + '}'; } }