diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 0a4ed9b5ac..d8ef0bb38d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -120,12 +120,24 @@ public class ThreadUtils { /** * Wrapper over ScheduledThreadPoolExecutor + * @param threadName * @param corePoolSize * @return */ - public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) { + public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) { + return newThreadScheduledExecutor(threadName, corePoolSize, true); + } + + /** + * Wrapper over ScheduledThreadPoolExecutor + * @param threadName + * @param corePoolSize + * @param isDaemon + * @return + */ + public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) { ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) + .setDaemon(isDaemon) .setNameFormat(threadName) .build(); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); @@ -135,7 +147,6 @@ public class ThreadUtils { return executor; } - public static ThreadInfo getThreadInfo(Thread t) { long tid = t.getId(); return threadBean.getThreadInfo(tid, STACK_DEPTH); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6b5063cba4..9512b1a1c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; +import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -37,8 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -46,7 +49,9 @@ import java.util.concurrent.TimeUnit; /** * master server */ -@ComponentScan("org.apache.dolphinscheduler") +@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) +}) public class MasterServer implements IStoppable { /** @@ -112,7 +117,7 @@ public class MasterServer implements IStoppable { masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM); + heartbeatMasterService = ThreadUtils.newThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM, false); // heartbeat thread implement Runnable heartBeatThread = heartBeatThread(); @@ -147,23 +152,17 @@ public class MasterServer implements IStoppable { } logger.error("start Quartz failed", e); } - - - /** - * register hooks, which are called before the process exits - */ - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - if (zkMasterClient.getActiveMasterNum() <= 1) { - zkMasterClient.getAlertDao().sendServerStopedAlert( - 1, OSUtils.getHost(), "Master-Server"); - } - stop("shutdownhook"); - } - })); } + @PreDestroy + public void destroy() { + // master server exit alert + if (zkMasterClient.getActiveMasterNum() <= 1) { + zkMasterClient.getAlertDao().sendServerStopedAlert( + 1, OSUtils.getHost(), "Master-Server"); + } + stop("shutdownhook"); + } /** * gracefully stop diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index ace93079ff..86bb7d3e07 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; @@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -56,7 +60,10 @@ import java.util.concurrent.TimeUnit; /** * worker server */ -@ComponentScan("org.apache.dolphinscheduler") +@SpringBootApplication +@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {MasterServer.class}) +}) public class WorkerServer implements IStoppable { /** @@ -104,11 +111,6 @@ public class WorkerServer implements IStoppable { */ private ExecutorService fetchTaskExecutorService; - /** - * CountDownLatch latch - */ - private CountDownLatch latch; - @Value("${server.is-combined-server:false}") private Boolean isCombinedServer; @@ -149,7 +151,7 @@ public class WorkerServer implements IStoppable { this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); - heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM); + heartbeatWorkerService = ThreadUtils.newThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM, false); // heartbeat thread implement Runnable heartBeatThread = heartBeatThread(); @@ -171,29 +173,15 @@ public class WorkerServer implements IStoppable { // submit fetch task thread fetchTaskExecutorService.execute(fetchTaskThread); + } - /** - * register hooks, which are called before the process exits - */ - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - // worker server exit alert - if (zkWorkerClient.getActiveMasterNum() <= 1) { - alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server"); - } - stop("shutdownhook"); - } - })); - - //let the main thread await - latch = new CountDownLatch(1); - if (!isCombinedServer) { - try { - latch.await(); - } catch (InterruptedException ignore) { - } + @PreDestroy + public void destroy() { + // worker server exit alert + if (zkWorkerClient.getActiveMasterNum() <= 1) { + alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server"); } + stop("shutdownhook"); } @Override @@ -251,7 +239,6 @@ public class WorkerServer implements IStoppable { }catch (Exception e){ logger.warn("zookeeper service stopped exception:{}",e.getMessage()); } - latch.countDown(); logger.info("zookeeper service stopped"); } catch (Exception e) {