Browse Source

The master and worker server exit exception #2163 (#2176)

* fix: #2163

* fix: format
pull/2/head
Rubik-W 5 years ago committed by GitHub
parent
commit
80c6ce5711
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  2. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -120,12 +120,24 @@ public class ThreadUtils {
/**
* Wrapper over ScheduledThreadPoolExecutor
* @param threadName
* @param corePoolSize
* @return
*/
public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) {
public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) {
return newThreadScheduledExecutor(threadName, corePoolSize, true);
}
/**
* Wrapper over ScheduledThreadPoolExecutor
* @param threadName
* @param corePoolSize
* @param isDaemon
* @return
*/
public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setDaemon(isDaemon)
.setNameFormat(threadName)
.build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
@ -135,7 +147,6 @@ public class ThreadUtils {
return executor;
}
public static ThreadInfo getThreadInfo(Thread t) {
long tid = t.getId();
return threadBean.getThreadInfo(tid, STACK_DEPTH);

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -37,8 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -46,7 +49,9 @@ import java.util.concurrent.TimeUnit;
/**
* master server
*/
@ComponentScan("org.apache.dolphinscheduler")
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
public class MasterServer implements IStoppable {
/**
@ -112,7 +117,7 @@ public class MasterServer implements IStoppable {
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
heartbeatMasterService = ThreadUtils.newThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
@ -147,23 +152,17 @@ public class MasterServer implements IStoppable {
}
logger.error("start Quartz failed", e);
}
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (zkMasterClient.getActiveMasterNum() <= 1) {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
stop("shutdownhook");
}
}));
}
@PreDestroy
public void destroy() {
// master server exit alert
if (zkMasterClient.getActiveMasterNum() <= 1) {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
stop("shutdownhook");
}
/**
* gracefully stop

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -56,7 +60,10 @@ import java.util.concurrent.TimeUnit;
/**
* worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {MasterServer.class})
})
public class WorkerServer implements IStoppable {
/**
@ -104,11 +111,6 @@ public class WorkerServer implements IStoppable {
*/
private ExecutorService fetchTaskExecutorService;
/**
* CountDownLatch latch
*/
private CountDownLatch latch;
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
@ -149,7 +151,7 @@ public class WorkerServer implements IStoppable {
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
heartbeatWorkerService = ThreadUtils.newThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
@ -171,29 +173,15 @@ public class WorkerServer implements IStoppable {
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
}
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
// worker server exit alert
if (zkWorkerClient.getActiveMasterNum() <= 1) {
alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
}
stop("shutdownhook");
}
}));
//let the main thread await
latch = new CountDownLatch(1);
if (!isCombinedServer) {
try {
latch.await();
} catch (InterruptedException ignore) {
}
@PreDestroy
public void destroy() {
// worker server exit alert
if (zkWorkerClient.getActiveMasterNum() <= 1) {
alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
}
stop("shutdownhook");
}
@Override
@ -251,7 +239,6 @@ public class WorkerServer implements IStoppable {
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
latch.countDown();
logger.info("zookeeper service stopped");
} catch (Exception e) {

Loading…
Cancel
Save