Browse Source

The master and worker server exit exception #2163 (#2176)

* fix: #2163

* fix: format
pull/3/MERGE
Rubik-W 5 years ago committed by gaojun2048
parent
commit
8b61afc9be
  1. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  2. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -120,12 +120,24 @@ public class ThreadUtils {
/** /**
* Wrapper over ScheduledThreadPoolExecutor * Wrapper over ScheduledThreadPoolExecutor
* @param threadName
* @param corePoolSize * @param corePoolSize
* @return * @return
*/ */
public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) { public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) {
return newThreadScheduledExecutor(threadName, corePoolSize, true);
}
/**
* Wrapper over ScheduledThreadPoolExecutor
* @param threadName
* @param corePoolSize
* @param isDaemon
* @return
*/
public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) {
ThreadFactory threadFactory = new ThreadFactoryBuilder() ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(isDaemon)
.setNameFormat(threadName) .setNameFormat(threadName)
.build(); .build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
@ -135,7 +147,6 @@ public class ThreadUtils {
return executor; return executor;
} }
public static ThreadInfo getThreadInfo(Thread t) { public static ThreadInfo getThreadInfo(Thread t) {
long tid = t.getId(); long tid = t.getId();
return threadBean.getThreadInfo(tid, STACK_DEPTH); return threadBean.getThreadInfo(tid, STACK_DEPTH);

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -37,8 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -46,7 +49,9 @@ import java.util.concurrent.TimeUnit;
/** /**
* master server * master server
*/ */
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
public class MasterServer implements IStoppable { public class MasterServer implements IStoppable {
/** /**
@ -112,7 +117,7 @@ public class MasterServer implements IStoppable {
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM); heartbeatMasterService = ThreadUtils.newThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement // heartbeat thread implement
Runnable heartBeatThread = heartBeatThread(); Runnable heartBeatThread = heartBeatThread();
@ -147,23 +152,17 @@ public class MasterServer implements IStoppable {
} }
logger.error("start Quartz failed", e); logger.error("start Quartz failed", e);
} }
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (zkMasterClient.getActiveMasterNum() <= 1) {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
stop("shutdownhook");
}
}));
} }
@PreDestroy
public void destroy() {
// master server exit alert
if (zkMasterClient.getActiveMasterNum() <= 1) {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
stop("shutdownhook");
}
/** /**
* gracefully stop * gracefully stop

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -56,7 +60,10 @@ import java.util.concurrent.TimeUnit;
/** /**
* worker server * worker server
*/ */
@ComponentScan("org.apache.dolphinscheduler") @SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {MasterServer.class})
})
public class WorkerServer implements IStoppable { public class WorkerServer implements IStoppable {
/** /**
@ -104,11 +111,6 @@ public class WorkerServer implements IStoppable {
*/ */
private ExecutorService fetchTaskExecutorService; private ExecutorService fetchTaskExecutorService;
/**
* CountDownLatch latch
*/
private CountDownLatch latch;
@Value("${server.is-combined-server:false}") @Value("${server.is-combined-server:false}")
private Boolean isCombinedServer; private Boolean isCombinedServer;
@ -149,7 +151,7 @@ public class WorkerServer implements IStoppable {
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM); heartbeatWorkerService = ThreadUtils.newThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement // heartbeat thread implement
Runnable heartBeatThread = heartBeatThread(); Runnable heartBeatThread = heartBeatThread();
@ -171,29 +173,15 @@ public class WorkerServer implements IStoppable {
// submit fetch task thread // submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread); fetchTaskExecutorService.execute(fetchTaskThread);
}
/** @PreDestroy
* register hooks, which are called before the process exits public void destroy() {
*/ // worker server exit alert
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { if (zkWorkerClient.getActiveMasterNum() <= 1) {
@Override alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
public void run() {
// worker server exit alert
if (zkWorkerClient.getActiveMasterNum() <= 1) {
alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
}
stop("shutdownhook");
}
}));
//let the main thread await
latch = new CountDownLatch(1);
if (!isCombinedServer) {
try {
latch.await();
} catch (InterruptedException ignore) {
}
} }
stop("shutdownhook");
} }
@Override @Override
@ -251,7 +239,6 @@ public class WorkerServer implements IStoppable {
}catch (Exception e){ }catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage()); logger.warn("zookeeper service stopped exception:{}",e.getMessage());
} }
latch.countDown();
logger.info("zookeeper service stopped"); logger.info("zookeeper service stopped");
} catch (Exception e) { } catch (Exception e) {

Loading…
Cancel
Save