From 2be1d4bf0add2485d4bf322aef7bbaeabfc223de Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 15 Jul 2022 20:06:53 +0800 Subject: [PATCH] Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979) * Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed * Move heartbeat error threshold to applicaiton.yml --- .../dolphinscheduler/common/Constants.java | 1 + .../common/thread/ThreadUtils.java | 14 ++++--- .../server/master/MasterServer.java | 38 ++++++++--------- .../server/master/config/MasterConfig.java | 7 ++++ .../master/registry/MasterRegistryClient.java | 16 ++++---- .../runner/MasterSchedulerBootstrap.java | 3 +- .../src/main/resources/application.yaml | 2 + .../server/registry/HeartBeatTask.java | 19 +++++++-- .../bean/SpringApplicationContext.java | 3 +- .../dolphinscheduler/StandaloneServer.java | 32 +++++++++++++-- .../src/main/resources/application.yaml | 4 ++ .../server/worker/WorkerServer.java | 41 ++++++------------- .../server/worker/config/WorkerConfig.java | 7 ++++ .../worker/registry/WorkerRegistryClient.java | 36 +++++++++------- .../src/main/resources/application.yaml | 2 + 15 files changed, 142 insertions(+), 83 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 099620778e..7a84e95a13 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -50,6 +50,7 @@ public final class Constants { public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; + public static final String FORMAT_SS = "%s%s"; public static final String FORMAT_S_S = "%s/%s"; public static final String FORMAT_S_S_COLON = "%s:%s"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 5c8020b7cd..f4f2a17bc7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.experimental.UtilityClass; @UtilityClass public class ThreadUtils { + + private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class); + /** * Wrapper over newDaemonFixedThreadExecutor. * @@ -35,10 +41,7 @@ public class ThreadUtils { * @return ExecutorService */ public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(threadName) - .build(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); return Executors.newFixedThreadPool(threadsNum, threadFactory); } @@ -48,8 +51,9 @@ public class ThreadUtils { public static void sleep(final long millis) { try { Thread.sleep(millis); - } catch (final InterruptedException ignore) { + } catch (final InterruptedException interruptedException) { Thread.currentThread().interrupt(); + logger.error("Current thread sleep error", interruptedException); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 3469cd4852..0bf3c945e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -115,32 +115,28 @@ public class MasterServer implements IStoppable { * @param cause close cause */ public void close(String cause) { - - try { - // set stop signal is true - // execute only once - if (!Stopper.stop()) { - logger.warn("MasterServer is already stopped, current cause: {}", cause); - return; - } + // set stop signal is true + // execute only once + if (!Stopper.stop()) { + logger.warn("MasterServer is already stopped, current cause: {}", cause); + return; + } + // thread sleep 3 seconds for thread quietly stop + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); + try (SchedulerApi closedSchedulerApi = schedulerApi; + MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap; + MasterRPCServer closedRpcServer = masterRPCServer; + MasterRegistryClient closedMasterRegistryClient = masterRegistryClient; + // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. + // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc + SpringApplicationContext closedSpringContext = springApplicationContext) { logger.info("Master server is stopping, current cause : {}", cause); - - // thread sleep 3 seconds for thread quietly stop - ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); - // close - this.schedulerApi.close(); - this.masterSchedulerBootstrap.close(); - this.masterRPCServer.close(); - this.masterRegistryClient.closeRegistry(); - // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. - // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc - springApplicationContext.close(); - - logger.info("MasterServer stopped, current cause: {}", cause); } catch (Exception e) { logger.error("MasterServer stop failed, current cause: {}", cause, e); + return; } + logger.info("MasterServer stopped, current cause: {}", cause); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index fe51e20984..7f6f124164 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -67,6 +67,10 @@ public class MasterConfig implements Validator { * Master heart beat task execute interval. */ private Duration heartbeatInterval = Duration.ofSeconds(10); + /** + * Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + */ + private int heartbeatErrorThreshold = 5; /** * task submit max retry times. */ @@ -129,6 +133,9 @@ public class MasterConfig implements Validator { if (masterConfig.getMaxCpuLoadAvg() <= 0) { masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } + if (masterConfig.getHeartbeatErrorThreshold() <= 0) { + errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value"); + } masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 359fdc745e..aa536bae49 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -53,7 +53,7 @@ import com.google.common.collect.Sets; *

When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry. */ @Component -public class MasterRegistryClient { +public class MasterRegistryClient implements AutoCloseable { /** * logger @@ -107,7 +107,8 @@ public class MasterRegistryClient { registryClient.setStoppable(stoppable); } - public void closeRegistry() { + @Override + public void close() { // TODO unsubscribe MasterRegistryDataListener deregister(); } @@ -189,11 +190,12 @@ public class MasterRegistryClient { String localNodePath = getCurrentNodePath(); Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - masterConfig.getMaxCpuLoadAvg(), - masterConfig.getReservedMemory(), - Sets.newHashSet(localNodePath), - Constants.MASTER_TYPE, - registryClient); + masterConfig.getMaxCpuLoadAvg(), + masterConfig.getReservedMemory(), + Sets.newHashSet(localNodePath), + Constants.MASTER_TYPE, + registryClient, + masterConfig.getHeartbeatErrorThreshold()); // remove before persist registryClient.remove(localNodePath); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 870cc1b24c..aa0542cb7c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -58,7 +58,7 @@ import org.springframework.stereotype.Service; * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @Service -public class MasterSchedulerBootstrap extends BaseDaemonThread { +public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class); @@ -116,6 +116,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread { logger.info("Master schedule bootstrap started..."); } + @Override public void close() { logger.info("Master schedule bootstrap stopping..."); logger.info("Master schedule bootstrap stopped..."); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 7a9fa8fcde..9f191ccd25 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -98,6 +98,8 @@ master: host-selector: lower_weight # master heartbeat interval heartbeat-interval: 10s + # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + heartbeat-error-threshold: 5 # master commit task retry times task-commit-retry-times: 5 # master commit task interval diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 129aaff0de..c84abb4182 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable { private final String serverType; private final HeartBeat heartBeat; + private final int heartBeatErrorThreshold; + + private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); + public HeartBeatTask(long startupTime, double maxCpuloadAvg, double reservedMemory, Set heartBeatPaths, String serverType, - RegistryClient registryClient) { + RegistryClient registryClient, + int heartBeatErrorThreshold) { this.heartBeatPaths = heartBeatPaths; this.registryClient = registryClient; this.serverType = serverType; this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); + this.heartBeatErrorThreshold = heartBeatErrorThreshold; } public HeartBeatTask(long startupTime, @@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable { String serverType, RegistryClient registryClient, int workerThreadCount, - int workerWaitingTaskCount - ) { + int workerWaitingTaskCount, + int heartBeatErrorThreshold) { this.heartBeatPaths = heartBeatPaths; this.registryClient = registryClient; this.workerWaitingTaskCount = workerWaitingTaskCount; this.serverType = serverType; this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount); + this.heartBeatErrorThreshold = heartBeatErrorThreshold; } public String getHeartBeatInfo() { @@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable { for (String heartBeatPath : heartBeatPaths) { registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); } + heartBeatErrorTimes.set(0); } catch (Throwable ex) { logger.error("HeartBeat task execute failed", ex); + if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) { + registryClient.getStoppable() + .stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes); + } } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java index 61dfcb35d7..5b37b1f72d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java @@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext; import org.springframework.stereotype.Component; @Component -public class SpringApplicationContext implements ApplicationContextAware { +public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable { private static ApplicationContext applicationContext; @@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware { /** * Close this application context, destroying all beans in its bean factory. */ + @Override public void close() { ((AbstractApplicationContext)applicationContext).close(); } diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java index 3866c53066..728fed8eaa 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java @@ -19,15 +19,41 @@ package org.apache.dolphinscheduler; import org.apache.curator.test.TestingServer; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationFailedEvent; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; + +import lombok.NonNull; @SpringBootApplication -public class StandaloneServer { +public class StandaloneServer implements ApplicationListener { + + private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class); + + private static TestingServer zookeeperServer; public static void main(String[] args) throws Exception { - final TestingServer server = new TestingServer(true); - System.setProperty("registry.zookeeper.connect-string", server.getConnectString()); + zookeeperServer = new TestingServer(true); + System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString()); SpringApplication.run(StandaloneServer.class, args); } + + @Override + public void onApplicationEvent(@NonNull ApplicationEvent event) { + if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) { + try (TestingServer closedServer = zookeeperServer) { + // close the zookeeper server + logger.info("Receive spring context close event: {}, will closed zookeeper server", event); + } catch (IOException e) { + logger.error("Close zookeeper server error", e); + } + } + } } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 6c5171e59d..5640467616 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -133,6 +133,8 @@ master: host-selector: lower_weight # master heartbeat interval heartbeat-interval: 10s + # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + heartbeat-error-threshold: 5 # master commit task retry times task-commit-retry-times: 5 # master commit task interval @@ -154,6 +156,8 @@ worker: exec-threads: 10 # worker heartbeat interval heartbeat-interval: 10s + # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + heartbeat-error-threshold: 5 # worker host weight to dispatch tasks, default value 100 host-weight: 100 # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d97a994249..d03a2a951b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.commons.collections4.CollectionUtils; import java.util.Collection; -import java.util.Set; import javax.annotation.PostConstruct; @@ -121,8 +120,7 @@ public class WorkerServer implements IStoppable { this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); - Set workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); - this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); + this.workerRegistryClient.handleDeadServer(); this.workerManagerThread.start(); @@ -139,37 +137,24 @@ public class WorkerServer implements IStoppable { } public void close(String cause) { - try { - // execute only once - // set stop signal is true - if (!Stopper.stop()) { - logger.warn("WorkerServer is already stopped, current cause: {}", cause); - return; - } + if (!Stopper.stop()) { + logger.warn("WorkerServer is already stopped, current cause: {}", cause); + return; + } + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); + try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer; + WorkerRegistryClient closedRegistryClient = workerRegistryClient; + AlertClientService closedAlertClientService = alertClientService; + SpringApplicationContext closedSpringContext = springApplicationContext;) { logger.info("Worker server is stopping, current cause : {}", cause); - - try { - // thread sleep 3 seconds for thread quitely stop - Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); - } catch (Exception e) { - logger.warn("Worker server close wait error", e); - } - - // close - this.workerRpcServer.close(); - this.workerRegistryClient.unRegistry(); - this.alertClientService.close(); - // kill running tasks this.killAllRunningTasks(); - - // close the application context - this.springApplicationContext.close(); - logger.info("Worker server stopped, current cause: {}", cause); } catch (Exception e) { logger.error("Worker server stop failed, current cause: {}", cause, e); + return; } + logger.info("Worker server stopped, current cause: {}", cause); } @Override diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index aab162d6d4..2367975715 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,6 +40,10 @@ public class WorkerConfig implements Validator { private int listenPort = 1234; private int execThreads = 10; private Duration heartbeatInterval = Duration.ofSeconds(10); + /** + * Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + */ + private int heartbeatErrorThreshold = 5; private int hostWeight = 100; private boolean tenantAutoCreate = true; private boolean tenantDistributedUser = false; @@ -70,6 +74,9 @@ public class WorkerConfig implements Validator { if (workerConfig.getMaxCpuLoadAvg() <= 0) { workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } + if (workerConfig.getHeartbeatErrorThreshold() <= 0) { + errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value"); + } workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index b94cdaf317..96d41c38c0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -54,7 +54,7 @@ import com.google.common.collect.Sets; * worker registry */ @Service -public class WorkerRegistryClient { +public class WorkerRegistryClient implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class); @@ -101,15 +101,15 @@ public class WorkerRegistryClient { long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - workerConfig.getMaxCpuLoadAvg(), - workerConfig.getReservedMemory(), - workerConfig.getHostWeight(), - workerZkPaths, - Constants.WORKER_TYPE, - registryClient, - workerConfig.getExecThreads(), - workerManagerThread.getThreadPoolQueueSize() - ); + workerConfig.getMaxCpuLoadAvg(), + workerConfig.getReservedMemory(), + workerConfig.getHostWeight(), + workerZkPaths, + Constants.WORKER_TYPE, + registryClient, + workerConfig.getExecThreads(), + workerManagerThread.getThreadPoolQueueSize(), + workerConfig.getHeartbeatErrorThreshold()); for (String workerZKPath : workerZkPaths) { // remove before persist @@ -147,8 +147,10 @@ public class WorkerRegistryClient { logger.error("remove worker zk path exception", ex); } - this.heartBeatExecutor.shutdownNow(); - logger.info("heartbeat executor shutdown"); + if (heartBeatExecutor != null) { + heartBeatExecutor.shutdownNow(); + logger.info("Heartbeat executor shutdown"); + } registryClient.close(); logger.info("registry client closed"); @@ -175,8 +177,9 @@ public class WorkerRegistryClient { return workerPaths; } - public void handleDeadServer(Set nodeSet, NodeType nodeType, String opType) { - registryClient.handleDeadServer(nodeSet, nodeType, opType); + public void handleDeadServer() { + Set workerZkPaths = getWorkerZkPaths(); + registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); } /** @@ -190,4 +193,9 @@ public class WorkerRegistryClient { registryClient.setStoppable(stoppable); } + @Override + public void close() throws IOException { + unRegistry(); + } + } diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index ad390ed6af..a9c3eadf3c 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -60,6 +60,8 @@ worker: exec-threads: 100 # worker heartbeat interval heartbeat-interval: 10s + # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + heartbeat-error-threshold: 5 # worker host weight to dispatch tasks, default value 100 host-weight: 100 # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.