DolphinScheduler master register client, used to connect to registry and hand the registry events.
@@ -55,48 +49,36 @@ import com.google.common.collect.Sets;
@Component
public class MasterRegistryClient implements AutoCloseable {
- /**
- * logger
- */
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
- /**
- * failover service
- */
@Autowired
private FailoverService failoverService;
@Autowired
private RegistryClient registryClient;
- /**
- * master config
- */
@Autowired
private MasterConfig masterConfig;
- /**
- * heartbeat executor
- */
+ @Autowired
+ private MasterConnectStrategy masterConnectStrategy;
+
private ScheduledExecutorService heartBeatExecutor;
/**
* master startup time, ms
*/
private long startupTime;
- private String masterAddress;
-
- public void init() {
- this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
- this.startupTime = System.currentTimeMillis();
- this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
- }
public void start() {
try {
+ this.startupTime = System.currentTimeMillis();
+ this.heartBeatExecutor =
+ Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
// master registry
registry();
- registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient));
+ registryClient.addConnectionStateListener(
+ new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up error", e);
@@ -137,11 +119,8 @@ public class MasterRegistryClient implements AutoCloseable {
try {
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
- // handle dead server
- registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
-
- //failover server
+ // failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
@@ -169,11 +148,9 @@ public class MasterRegistryClient implements AutoCloseable {
}
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
- // handle dead server
- registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
}
- //failover server
+ // failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
@@ -186,16 +163,14 @@ public class MasterRegistryClient implements AutoCloseable {
* Registry the current master server itself to registry.
*/
void registry() {
- logger.info("Master node : {} registering to registry center", masterAddress);
- String localNodePath = getCurrentNodePath();
+ logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
+ String localNodePath = masterConfig.getMasterRegistryNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(localNodePath),
- Constants.MASTER_TYPE,
- registryClient,
- masterConfig.getHeartbeatErrorThreshold());
+ masterConfig.getMaxCpuLoadAvg(),
+ masterConfig.getReservedMemory(),
+ Sets.newHashSet(localNodePath),
+ registryClient);
// remove before persist
registryClient.remove(localNodePath);
@@ -209,19 +184,17 @@ public class MasterRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- // delete dead server
- registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
-
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
- logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
+ this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
+ TimeUnit.SECONDS);
+ logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
+ masterConfig.getMasterAddress(), masterHeartbeatInterval);
}
public void deregister() {
try {
- String localNodePath = getCurrentNodePath();
- registryClient.remove(localNodePath);
- logger.info("Master node : {} unRegistry to register center.", masterAddress);
+ registryClient.remove(masterConfig.getMasterRegistryNodePath());
+ logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
heartBeatExecutor.shutdown();
logger.info("MasterServer heartbeat executor shutdown");
registryClient.close();
@@ -230,11 +203,4 @@ public class MasterRegistryClient implements AutoCloseable {
}
}
- /**
- * get master path
- */
- private String getCurrentNodePath() {
- return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterAddress;
- }
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
new file mode 100644
index 0000000000..1b1f2c84a8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+/**
+ * This strategy will stop the master server, when disconnected from {@link org.apache.dolphinscheduler.registry.api.Registry}.
+ */
+@Service
+@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
+public class MasterStopStrategy implements MasterConnectStrategy {
+
+ private final Logger logger = LoggerFactory.getLogger(MasterStopStrategy.class);
+
+ @Autowired
+ private RegistryClient registryClient;
+ @Autowired
+ private MasterConfig masterConfig;
+
+ @Override
+ public void disconnect() {
+ registryClient.getStoppable()
+ .stop("Master disconnected from registry, will stop myself due to the stop strategy");
+ }
+
+ @Override
+ public void reconnect() {
+ logger.warn("The current connect strategy is stop, so the master will not reconnect to registry");
+ }
+
+ @Override
+ public StrategyType getStrategyType() {
+ return StrategyType.STOP;
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
new file mode 100644
index 0000000000..654d96f25b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
+import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+
+/**
+ * This strategy will change the server status to {@link ServerStatus#WAITING} when disconnect from {@link Registry}.
+ */
+@Service
+@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
+public class MasterWaitingStrategy implements MasterConnectStrategy {
+
+ private final Logger logger = LoggerFactory.getLogger(MasterWaitingStrategy.class);
+
+ @Autowired
+ private MasterConfig masterConfig;
+ @Autowired
+ private RegistryClient registryClient;
+ @Autowired
+ private MasterRPCServer masterRPCServer;
+ @Autowired
+ private WorkflowEventQueue workflowEventQueue;
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ @Autowired
+ private StateWheelExecuteThread stateWheelExecuteThread;
+
+ @Override
+ public void disconnect() {
+ try {
+ ServerLifeCycleManager.toWaiting();
+ // todo: clear the current resource
+ clearMasterResource();
+ Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
+ try {
+ logger.info("Master disconnect from registry will try to reconnect in {} s",
+ maxWaitingTime.getSeconds());
+ registryClient.connectUntilTimeout(maxWaitingTime);
+ } catch (RegistryException ex) {
+ throw new ServerLifeCycleException(
+ String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
+ }
+ } catch (ServerLifeCycleException e) {
+ String errorMessage = String.format(
+ "Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
+ ServerLifeCycleManager.getServerStatus());
+ logger.error(errorMessage, e);
+ registryClient.getStoppable().stop(errorMessage);
+ } catch (RegistryException ex) {
+ String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
+ logger.error(errorMessage, ex);
+ registryClient.getStoppable().stop(errorMessage);
+ } catch (Exception ex) {
+ String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
+ logger.error(errorMessage, ex);
+ registryClient.getStoppable().stop(errorMessage);
+ }
+ }
+
+ @Override
+ public void reconnect() {
+ try {
+ ServerLifeCycleManager.recoverFromWaiting();
+ reStartMasterResource();
+ // reopen the resource
+ logger.info("Recover from waiting success, the current server status is {}",
+ ServerLifeCycleManager.getServerStatus());
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Recover from waiting failed, the current server status is %s, will stop the server",
+ ServerLifeCycleManager.getServerStatus());
+ logger.error(errorMessage, e);
+ registryClient.getStoppable().stop(errorMessage);
+ }
+ }
+
+ @Override
+ public StrategyType getStrategyType() {
+ return StrategyType.WAITING;
+ }
+
+ private void clearMasterResource() {
+ // close the worker resource, if close failed should stop the worker server
+ masterRPCServer.close();
+ logger.warn("Master closed RPC server due to lost registry connection");
+ workflowEventQueue.clearWorkflowEventQueue();
+ logger.warn("Master clear workflow event queue due to lost registry connection");
+ processInstanceExecCacheManager.clearCache();
+ logger.warn("Master clear process instance cache due to lost registry connection");
+ stateWheelExecuteThread.clearAllTasks();
+ logger.warn("Master clear all state wheel task due to lost registry connection");
+
+ }
+
+ private void reStartMasterResource() {
+ // reopen the resource, if reopen failed should stop the worker server
+ masterRPCServer.start();
+ logger.warn("Master restarted RPC server due to reconnect to registry");
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index c2a9de0446..7bdebe1ee0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -82,8 +82,8 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private TaskExecuteStartProcessor taskExecuteStartProcessor;
- @PostConstruct
- private void init() {
+ public void start() {
+ logger.info("Starting Master RPC Server...");
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
@@ -106,11 +106,6 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
- }
-
- public void start() {
- logger.info("Starting Master RPC Server...");
- this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 24a23485ff..14b0252d7c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
@@ -61,7 +61,7 @@ public class EventExecuteService extends BaseDaemonThread {
@Override
public void run() {
- while (Stopper.isRunning()) {
+ while (!ServerLifeCycleManager.isStopped()) {
try {
workflowEventHandler();
streamTaskEventHandler();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 16656c8760..5546b474d7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,13 +18,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -60,8 +58,11 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// when startup, wait 10s for ready
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
- while (Stopper.isRunning()) {
+ while (!ServerLifeCycleManager.isStopped()) {
try {
+ if (!ServerLifeCycleManager.isRunning()) {
+ continue;
+ }
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index aa0542cb7c..5084c3d1cc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -17,10 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -40,8 +41,10 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,11 +52,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@@ -104,7 +102,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
* constructor of MasterSchedulerService
*/
public void init() {
- this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
+ this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
+ .newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
@@ -127,11 +126,15 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
*/
@Override
public void run() {
- while (Stopper.isRunning()) {
+ while (!ServerLifeCycleManager.isStopped()) {
try {
+ if (!ServerLifeCycleManager.isRunning()) {
+ // the current server is not at running status, cannot consume command.
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ }
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
- OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
+ OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -156,18 +159,19 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
- logger.error("The workflow instance is already been cached, this case shouldn't be happened");
+ logger.error(
+ "The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
- processService,
- nettyExecutorManager,
- processAlertManager,
- masterConfig,
- stateWheelExecuteThread,
- curingGlobalParamsService);
+ processService,
+ nettyExecutorManager,
+ processAlertManager,
+ masterConfig,
+ stateWheelExecuteThread,
+ curingGlobalParamsService);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
- processInstance.getId()));
+ processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -186,24 +190,28 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private List