From 76fd384a2eee088f6dc64b42cb5500806e0677d8 Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Fri, 6 Mar 2020 21:13:48 +0800 Subject: [PATCH 1/3] refactor worker registry --- .../server/master/MasterServer.java | 16 +----- .../server/master/config/MasterConfig.java | 11 ++++ .../consumer/TaskUpdateQueueConsumer.java | 8 +++ .../master/registry/MasterRegistry.java | 42 +++++++------- .../server/worker/WorkerServer.java | 27 ++++----- .../server/worker/config/WorkerConfig.java | 11 ++++ .../worker/registry/WorkerRegistry.java | 57 +++++++------------ 7 files changed, 81 insertions(+), 91 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 292bfaea2c..9ad359765f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -26,19 +25,16 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,12 +80,6 @@ public class MasterServer { @Autowired private MasterConfig masterConfig; - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - /** * spring application context * only use it for initialization @@ -105,6 +95,7 @@ public class MasterServer { /** * master registry */ + @Autowired private MasterRegistry masterRegistry; /** @@ -126,7 +117,7 @@ public class MasterServer { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(45678); + serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); @@ -134,7 +125,6 @@ public class MasterServer { this.nettyRemotingServer.start(); // - this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval()); this.masterRegistry.registry(); // @@ -166,8 +156,6 @@ public class MasterServer { logger.error("start Quartz failed", e); } - TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class); - taskUpdateQueueConsumer.start(); /** * register hooks, which are called before the process exits */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index e8a8ecbe43..7e6ae5618a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -46,6 +46,17 @@ public class MasterConfig { @Value("${master.host.selector:lowerWeight}") private String hostSelector; + @Value("${master.listen.port:45678}") + private int listenPort; + + public int getListenPort() { + return listenPort; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + public String getHostSelector() { return hostSelector; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index cccc700aea..e3957afc26 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; + /** * TaskUpdateQueue consumer */ @@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{ @Autowired private ExecutorDispatcher dispatcher; + @PostConstruct + public void init(){ + super.setName("TaskUpdateQueueConsumerThread"); + super.start(); + } + @Override public void run() { while (Stopper.isRunning()){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 1eb06b6d65..0402520e57 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -37,6 +41,7 @@ import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; /** * master registry */ +@Service public class MasterRegistry { private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); @@ -44,38 +49,28 @@ public class MasterRegistry { /** * zookeeper registry center */ - private final ZookeeperRegistryCenter zookeeperRegistryCenter; + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * port + * master config */ - private final int port; - - /** - * heartbeat interval - */ - private final long heartBeatInterval; + @Autowired + private MasterConfig masterConfig; /** * heartbeat executor */ - private final ScheduledExecutorService heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutor; /** * worker start time */ - private final String startTime; + private String startTime; - /** - * construct - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - * @param heartBeatInterval heartBeatInterval - */ - public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ - this.zookeeperRegistryCenter = zookeeperRegistryCenter; - this.port = port; - this.heartBeatInterval = heartBeatInterval; + + @PostConstruct + public void init(){ this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -100,8 +95,9 @@ public class MasterRegistry { } } }); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); + int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } /** @@ -129,7 +125,7 @@ public class MasterRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + port; + return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort(); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 068d546b0c..441e8db892 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; @@ -37,8 +35,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; /** * worker server @@ -51,18 +47,6 @@ public class WorkerServer { */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * worker config - */ - @Autowired - private WorkerConfig workerConfig; - - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - /** * netty remote server */ @@ -71,8 +55,15 @@ public class WorkerServer { /** * worker registry */ + @Autowired private WorkerRegistry workerRegistry; + /** + * worker config + */ + @Autowired + private WorkerConfig workerConfig; + /** * spring application context * only use it for initialization @@ -87,6 +78,7 @@ public class WorkerServer { * @param args arguments */ public static void main(String[] args) { + System.setProperty("spring.profiles.active","worker"); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -101,12 +93,13 @@ public class WorkerServer { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(workerConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); + // this.workerRegistry.registry(); /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 3c7500aa8b..f3e701b6c9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,6 +40,17 @@ public class WorkerConfig { @Value("${worker.group: default}") private String workerGroup; + @Value("${worker.listen.port: 12345}") + private int listenPort; + + public int getListenPort() { + return listenPort; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + public String getWorkerGroup() { return workerGroup; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index a1d55240b2..b42386a200 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +45,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH; /** * worker registry */ +@Service public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); @@ -48,54 +53,31 @@ public class WorkerRegistry { /** * zookeeper registry center */ - private final ZookeeperRegistryCenter zookeeperRegistryCenter; + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * port + * worker config */ - private final int port; - - /** - * heartbeat interval - */ - private final long heartBeatInterval; + @Autowired + private WorkerConfig workerConfig; /** * heartbeat executor */ - private final ScheduledExecutorService heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutor; /** * worker start time */ - private final String startTime; + private String startTime; - /** - * worker group - */ - private String workerGroup; - /** - * construct - * - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - * @param heartBeatInterval heartBeatInterval - */ - public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ - this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP); - } + private String workerGroup; - /** - * construct - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - */ - public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){ - this.zookeeperRegistryCenter = zookeeperRegistryCenter; - this.port = port; - this.heartBeatInterval = heartBeatInterval; - this.workerGroup = workerGroup; + @PostConstruct + public void init(){ + this.workerGroup = workerConfig.getWorkerGroup(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -120,8 +102,9 @@ public class WorkerRegistry { } } }); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); - logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); + int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); + logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); } @@ -159,7 +142,7 @@ public class WorkerRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + port; + return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort(); } /** From a4c2dfa3112f5df70ba13f4870a8bacab8d5d78d Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Sun, 8 Mar 2020 13:26:09 +0800 Subject: [PATCH 2/3] refactor master server --- .../server/master/MasterServer.java | 85 +++--------------- ...hread.java => MasterSchedulerService.java} | 82 +++++++---------- .../server/registry/ZookeeperNodeManager.java | 9 ++ .../server/worker/WorkerServer.java | 7 -- .../server/zk/ZKMasterClient.java | 88 +++---------------- .../service/quartz/ProcessScheduleJob.java | 23 ++--- .../service/zk/AbstractZKClient.java | 69 +-------------- 7 files changed, 78 insertions(+), 285 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/{MasterSchedulerThread.java => MasterSchedulerService.java} (66%) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 9ad359765f..212e5d9a88 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -29,11 +26,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.quartz.SchedulerException; import org.slf4j.Logger; @@ -44,7 +38,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; -import java.util.concurrent.ExecutorService; /** * master server @@ -57,23 +50,6 @@ public class MasterServer { */ private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); - /** - * zk master client - */ - @Autowired - private ZKMasterClient zkMasterClient = null; - - /** - * process service - */ - @Autowired - protected ProcessService processService; - - /** - * master exec thread pool - */ - private ExecutorService masterSchedulerService; - /** * master config */ @@ -98,6 +74,12 @@ public class MasterServer { @Autowired private MasterRegistry masterRegistry; + /** + * zk master client + */ + @Autowired + private ZKMasterClient zkMasterClient; + /** * master server startup * @@ -125,27 +107,13 @@ public class MasterServer { this.nettyRemotingServer.start(); // + this.zkMasterClient.start(); this.masterRegistry.registry(); - // - zkMasterClient.init(); - - masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - - // master scheduler thread - MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( - zkMasterClient, - processService, - masterConfig.getMasterExecThreads()); - - // submit master scheduler thread - masterSchedulerService.execute(masterSchedulerThread); - // start QuartzExecutors // what system should do if exception try { logger.info("start Quartz server..."); - ProcessScheduleJob.init(processService); QuartzExecutors.getInstance().start(); } catch (Exception e) { try { @@ -162,19 +130,15 @@ public class MasterServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - if (zkMasterClient.getActiveMasterNum() <= 1) { - zkMasterClient.getAlertDao().sendServerStopedAlert( - 1, OSUtils.getHost(), "Master-Server"); - } - close("shutdownhook"); + close("shutdownHook"); } })); } /** - * gracefully stop - * @param cause why stopping + * gracefully close + * @param cause */ public void close(String cause) { @@ -197,40 +161,15 @@ public class MasterServer { } this.nettyRemotingServer.close(); this.masterRegistry.unRegistry(); + this.zkMasterClient.close(); //close quartz try{ QuartzExecutors.getInstance().shutdown(); + logger.info("Quartz service stopped"); }catch (Exception e){ logger.warn("Quartz service stopped exception:{}",e.getMessage()); } - - logger.info("Quartz service stopped"); - - try { - ThreadPoolExecutors.getInstance().shutdown(); - }catch (Exception e){ - logger.warn("threadPool service stopped exception:{}",e.getMessage()); - } - - logger.info("threadPool service stopped"); - - try { - masterSchedulerService.shutdownNow(); - }catch (Exception e){ - logger.warn("master scheduler service stopped exception:{}",e.getMessage()); - } - - logger.info("master scheduler service stopped"); - - try { - zkMasterClient.close(); - }catch (Exception e){ - logger.warn("zookeeper service stopped exception:{}",e.getMessage()); - } - - logger.info("zookeeper service stopped"); - } catch (Exception e) { logger.error("master server stop exception ", e); System.exit(-1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java similarity index 66% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6e96164e65..a5598ee8c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -28,49 +28,43 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; -import java.util.concurrent.ExecutorService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.ThreadPoolExecutor; /** * master scheduler thread */ -public class MasterSchedulerThread implements Runnable { +@Service +public class MasterSchedulerService extends Thread { /** * logger of MasterSchedulerThread */ - private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerThread.class); + private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); /** * master exec service */ - private final ExecutorService masterExecService; + private ThreadPoolExecutor masterExecService; /** * dolphinscheduler database interface */ - private final ProcessService processService; + @Autowired + private ProcessService processService; /** * zookeeper master client */ - private final ZKMasterClient zkMasterClient ; - - /** - * master exec thread num - */ - private int masterExecThreadNum; - - /** - * master config - */ - private MasterConfig masterConfig; + @Autowired + private ZKMasterClient zkMasterClient; /** * netty remoting client @@ -78,21 +72,25 @@ public class MasterSchedulerThread implements Runnable { private NettyRemotingClient nettyRemotingClient; + @Autowired + private MasterConfig masterConfig; + /** * constructor of MasterSchedulerThread - * @param zkClient zookeeper master client - * @param processService process service - * @param masterExecThreadNum master exec thread num */ - public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService processService, int masterExecThreadNum){ - this.processService = processService; - this.zkMasterClient = zkClient; - this.masterExecThreadNum = masterExecThreadNum; - this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum); - this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - // + @PostConstruct + public void init(){ + this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + super.setName("MasterSchedulerThread"); + super.start(); + } + + @PreDestroy + public void close(){ + nettyRemotingClient.close(); + logger.info("master schedule service stopped..."); } /** @@ -100,15 +98,10 @@ public class MasterSchedulerThread implements Runnable { */ @Override public void run() { - logger.info("master scheduler start successfully..."); + logger.info("master scheduler started"); while (Stopper.isRunning()){ - - // process instance - ProcessInstance processInstance = null; - InterProcessMutex mutex = null; try { - boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if(!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); @@ -116,21 +109,16 @@ public class MasterSchedulerThread implements Runnable { } if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters - String znodeLock = zkMasterClient.getMasterLockPath(); + mutex = zkMasterClient.blockAcquireMutex(); - mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); - mutex.acquire(); - - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; - int activeCount = poolExecutor.getActiveCount(); + int activeCount = masterExecService.getActiveCount(); // make sure to scan and delete command table in one transaction Command command = processService.findOneCommand(); if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ - processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); + ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); @@ -144,15 +132,11 @@ public class MasterSchedulerThread implements Runnable { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } - }catch (Exception e){ + } catch (Exception e){ logger.error("master scheduler thread exception",e); - }finally{ - AbstractZKClient.releaseMutex(mutex); + } finally{ + zkMasterClient.releaseMutex(mutex); } } - nettyRemotingClient.close(); - logger.info("master server stopped..."); } - - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 9a4a7caaf1..a437888f2b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,12 @@ public class ZookeeperNodeManager implements InitializingBean { @Autowired private ZookeeperRegistryCenter registryCenter; + /** + * alert dao + */ + @Autowired + private AlertDao alertDao; + /** * init listener * @throws Exception @@ -136,6 +143,7 @@ public class ZookeeperNodeManager implements InitializingBean { Set previousNodes = new HashSet<>(workerNodes); Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); + alertDao.sendServerStopedAlert(1, path, "WORKER"); } } catch (IllegalArgumentException ignore) { logger.warn(ignore.getMessage()); @@ -175,6 +183,7 @@ public class ZookeeperNodeManager implements InitializingBean { Set previousNodes = new HashSet<>(masterNodes); Set currentNodes = registryCenter.getMasterNodesDirectly(); syncMasterNodes(currentNodes); + alertDao.sendServerStopedAlert(1, path, "MASTER"); } } catch (Exception ex) { logger.error("MasterNodeListener capture data change and get data failed.", ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 441e8db892..e1872f7444 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -78,7 +77,6 @@ public class WorkerServer { * @param args arguments */ public static void main(String[] args) { - System.setProperty("spring.profiles.active","worker"); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -136,11 +134,6 @@ public class WorkerServer { this.nettyRemotingServer.close(); this.workerRegistry.unRegistry(); - try { - ThreadPoolExecutors.getInstance().shutdown(); - }catch (Exception e){ - logger.warn("threadPool service stopped exception:{}",e.getMessage()); - } } catch (Exception e) { logger.error("worker server stop exception ", e); System.exit(-1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 7fc91dc9e2..a59cf3e397 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -16,22 +16,19 @@ */ package org.apache.dolphinscheduler.server.zk; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.utils.ThreadUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.slf4j.Logger; @@ -41,7 +38,6 @@ import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; -import java.util.concurrent.ThreadFactory; /** @@ -57,46 +53,19 @@ public class ZKMasterClient extends AbstractZKClient { */ private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); - /** - * thread factory - */ - private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Master-Main-Thread"); - - /** - * master znode - */ - private String masterZNode = null; - - /** - * alert database access - */ - private AlertDao alertDao = null; /** * process service */ @Autowired private ProcessService processService; - /** - * default constructor - */ - private ZKMasterClient(){} - - /** - * init - */ - public void init(){ - - logger.info("initialize master client..."); - - // init dao - this.initDao(); + public void start() { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(zkClient, znodeLock); + mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex.acquire(); // init system znode @@ -115,20 +84,9 @@ public class ZKMasterClient extends AbstractZKClient { } } - - /** - * init dao - */ - public void initDao(){ - this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); - } - /** - * get alert dao - * - * @return AlertDao - */ - public AlertDao getAlertDao() { - return alertDao; + @Override + public void close(){ + super.close(); } /** @@ -167,8 +125,6 @@ public class ZKMasterClient extends AbstractZKClient { String serverHost = getHostByEventDataPath(path); // handle dead server handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); - //alert server down. - alertServerDown(serverHost, zkNodeType); //failover server if(failover){ failoverServerWhenDown(serverHost, zkNodeType); @@ -222,18 +178,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - /** - * send alert when server down - * - * @param serverHost server host - * @param zkNodeType zookeeper node type - */ - private void alertServerDown(String serverHost, ZKNodeType zkNodeType) { - - String serverType = zkNodeType.toString(); - alertDao.sendServerStopedAlert(1, serverHost, serverType); - } - /** * monitor master * @param event event @@ -271,16 +215,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - - /** - * get master znode - * - * @return master zookeeper node - */ - public String getMasterZNode() { - return masterZNode; - } - /** * task needs failover if task start before worker starts * @@ -399,4 +333,10 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("master failover end"); } + public InterProcessMutex blockAcquireMutex() throws Exception { + InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); + mutex.acquire(); + return mutex; + } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index 69a80e65f5..d055e2de85 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.quartz.Job; import org.quartz.JobDataMap; @@ -44,18 +45,8 @@ public class ProcessScheduleJob implements Job { */ private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); - /** - * process service - */ - private static ProcessService processService; - - - /** - * init - * @param processService process dao - */ - public static void init(ProcessService processService) { - ProcessScheduleJob.processService = processService; + public ProcessService getProcessService(){ + return SpringApplicationContext.getBean(ProcessService.class); } /** @@ -67,7 +58,7 @@ public class ProcessScheduleJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { - Assert.notNull(processService, "please call init() method first"); + Assert.notNull(getProcessService(), "please call init() method first"); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); @@ -83,7 +74,7 @@ public class ProcessScheduleJob implements Job { logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId); // query schedule - Schedule schedule = processService.querySchedule(scheduleId); + Schedule schedule = getProcessService().querySchedule(scheduleId); if (schedule == null) { logger.warn("process schedule does not exist in db,delete schedule job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId); deleteJob(projectId, scheduleId); @@ -91,7 +82,7 @@ public class ProcessScheduleJob implements Job { } - ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); + ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId()); // release state : online/offline ReleaseState releaseState = processDefinition.getReleaseState(); if (processDefinition == null || releaseState == ReleaseState.OFFLINE) { @@ -111,7 +102,7 @@ public class ProcessScheduleJob implements Job { command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); - processService.createCommand(command); + getProcessService().createCommand(command); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 24bf25984b..0b9fbe412a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -16,19 +16,15 @@ */ package org.apache.dolphinscheduler.service.zk; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.*; @@ -37,15 +33,11 @@ import static org.apache.dolphinscheduler.common.Constants.*; /** * abstract zookeeper client */ +@Component public abstract class AbstractZKClient extends ZookeeperCachedOperator { private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - /** - * server stop or not - */ - protected IStoppable stoppable = null; - /** * check dead server or not , if dead, stop self * @@ -65,8 +57,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { if(!isExisted(zNode) || isExisted(deadServerPath)){ return true; } - - return false; } @@ -99,28 +89,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return registerPath; } - /** - * register server, if server already exists, return null. - * @param zkNodeType zookeeper node type - * @return register server path in zookeeper - * @throws Exception errors - */ - public String registerServer(ZKNodeType zkNodeType) throws Exception { - String registerPath = null; - String host = OSUtils.getHost(); - if(checkZKNodeExists(host, zkNodeType)){ - logger.error("register failure , {} server already started on host : {}" , - zkNodeType.toString(), host); - return registerPath; - } - registerPath = createZNodePath(zkNodeType, host); - - // handle dead server - handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); - - return registerPath; - } - /** * opType(add): if find dead server , then add to zk deadServerPath * opType(delete): delete path from zk @@ -152,16 +120,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { } - - - /** - * for stop server - * @param serverStoppable server stoppable interface - */ - public void setStoppable(IStoppable serverStoppable){ - this.stoppable = serverStoppable; - } - /** * get active master num * @return active master number @@ -275,14 +233,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; } - /** - * - * @return get master lock path - */ - public String getWorkerLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS; - } - /** * * @param zkNodeType zookeeper node type @@ -339,7 +289,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { * release mutex * @param mutex mutex */ - public static void releaseMutex(InterProcessMutex mutex) { + public void releaseMutex(InterProcessMutex mutex) { if (mutex != null){ try { mutex.release(); @@ -387,18 +337,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return pathArray[pathArray.length - 1]; } - /** - * acquire zk lock - * @param zkClient zk client - * @param zNodeLockPath zk lock path - * @return zk lock - * @throws Exception errors - */ - public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{ - InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath); - mutex.acquire(); - return mutex; - } @Override public String toString() { @@ -407,7 +345,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + - ", stoppable=" + stoppable + '}'; } } From cc97052cc0c2e4c69768150b8c8ec50fb3583a42 Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Mon, 9 Mar 2020 10:23:25 +0800 Subject: [PATCH 3/3] refactor MasterSchedulerService --- .../server/master/MasterServer.java | 7 +++++ .../master/runner/MasterSchedulerService.java | 26 +++++++++---------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 212e5d9a88..0f3656b221 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -80,6 +81,9 @@ public class MasterServer { @Autowired private ZKMasterClient zkMasterClient; + @Autowired + private MasterSchedulerService masterSchedulerService; + /** * master server startup * @@ -109,6 +113,8 @@ public class MasterServer { // this.zkMasterClient.start(); this.masterRegistry.registry(); + // + masterSchedulerService.start(); // start QuartzExecutors // what system should do if exception @@ -162,6 +168,7 @@ public class MasterServer { this.nettyRemotingServer.close(); this.masterRegistry.unRegistry(); this.zkMasterClient.close(); + this.masterSchedulerService.close(); //close quartz try{ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index a5598ee8c6..6949ada022 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -34,8 +34,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.concurrent.ThreadPoolExecutor; /** @@ -49,11 +47,6 @@ public class MasterSchedulerService extends Thread { */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); - /** - * master exec service - */ - private ThreadPoolExecutor masterExecService; - /** * dolphinscheduler database interface */ @@ -66,28 +59,33 @@ public class MasterSchedulerService extends Thread { @Autowired private ZKMasterClient zkMasterClient; + @Autowired + private MasterConfig masterConfig; + /** * netty remoting client */ - private NettyRemotingClient nettyRemotingClient; - + private final NettyRemotingClient nettyRemotingClient; - @Autowired - private MasterConfig masterConfig; + /** + * master exec service + */ + private final ThreadPoolExecutor masterExecService; /** * constructor of MasterSchedulerThread */ - @PostConstruct - public void init(){ + public MasterSchedulerService(){ this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + } + + public void start(){ super.setName("MasterSchedulerThread"); super.start(); } - @PreDestroy public void close(){ nettyRemotingClient.close(); logger.info("master schedule service stopped...");