From 3cb93300177c246e2a903a4e8f6ea18f9fad1aa4 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 18 Nov 2024 09:21:04 +0800 Subject: [PATCH] [Improvement] Optimize the AbstractHAServer implementation (#16810) Co-authored-by: xiangzihao <460888207@qq.com> --- .../dolphinscheduler/alert/AlertServer.java | 44 +++++++++++- .../alert/service/AlertBootstrapService.java | 30 +------- .../alert/service/AlertHAServer.java | 15 +++- .../registry/api/enums/RegistryNodeType.java | 6 +- .../registry/api/ha/AbstractHAServer.java | 72 +++++++++++-------- .../AbstractServerStatusChangeListener.java | 1 - .../registry/api/ha/HAServer.java | 5 +- 7 files changed, 106 insertions(+), 67 deletions(-) diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index ff2e985426..7a6d941b70 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -18,7 +18,11 @@ package org.apache.dolphinscheduler.alert; import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient; +import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer; import org.apache.dolphinscheduler.alert.service.AlertBootstrapService; +import org.apache.dolphinscheduler.alert.service.AlertHAServer; import org.apache.dolphinscheduler.common.CommonConfiguration; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; @@ -26,6 +30,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.DaoConfiguration; import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; +import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -44,6 +49,18 @@ import org.springframework.context.annotation.Import; @SpringBootApplication public class AlertServer { + @Autowired + private AlertRpcServer alertRpcServer; + + @Autowired + private AlertPluginManager alertPluginManager; + + @Autowired + private AlertRegistryClient alertRegistryClient; + + @Autowired + private AlertHAServer alertHAServer; + @Autowired private AlertBootstrapService alertBootstrapService; @@ -58,7 +75,25 @@ public class AlertServer { public void run() { ServerLifeCycleManager.toRunning(); log.info("AlertServer is staring ..."); - alertBootstrapService.start(); + alertPluginManager.start(); + alertRpcServer.start(); + alertRegistryClient.start(); + + alertHAServer.addServerStatusChangeListener(new AbstractServerStatusChangeListener() { + + @Override + public void changeToActive() { + alertBootstrapService.start(); + } + + @Override + public void changeToStandBy() { + close(); + } + }); + + alertHAServer.start(); + log.info("AlertServer is started ..."); } @@ -73,7 +108,12 @@ public class AlertServer { return; } log.info("AlertServer is stopping, cause: {}", cause); - alertBootstrapService.close(); + try ( + final AlertRpcServer ignore = alertRpcServer; + final AlertRegistryClient ignore1 = alertRegistryClient; + final AlertHAServer ignore2 = alertHAServer; + final AlertBootstrapService ignore3 = alertBootstrapService;) { + } // thread sleep 3 seconds for thread quietly stop ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); log.info("AlertServer stopped, cause: {}", cause); diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java index 1da1b8180f..2cefe896d3 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.alert.service; -import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient; import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer; @@ -32,39 +31,21 @@ import org.springframework.stereotype.Service; @Service public final class AlertBootstrapService implements AutoCloseable { - private final AlertRpcServer alertRpcServer; - - private final AlertRegistryClient alertRegistryClient; - - private final AlertPluginManager alertPluginManager; - - private final AlertHAServer alertHAServer; - private final AlertEventFetcher alertEventFetcher; private final AlertEventLoop alertEventLoop; public AlertBootstrapService(AlertRpcServer alertRpcServer, AlertRegistryClient alertRegistryClient, - AlertPluginManager alertPluginManager, AlertHAServer alertHAServer, AlertEventFetcher alertEventFetcher, AlertEventLoop alertEventLoop) { - this.alertRpcServer = alertRpcServer; - this.alertRegistryClient = alertRegistryClient; - this.alertPluginManager = alertPluginManager; - this.alertHAServer = alertHAServer; this.alertEventFetcher = alertEventFetcher; this.alertEventLoop = alertEventLoop; } public void start() { log.info("AlertBootstrapService starting..."); - alertPluginManager.start(); - alertRpcServer.start(); - alertRegistryClient.start(); - alertHAServer.start(); - alertEventFetcher.start(); alertEventLoop.start(); log.info("AlertBootstrapService started..."); @@ -73,15 +54,8 @@ public final class AlertBootstrapService implements AutoCloseable { @Override public void close() { log.info("AlertBootstrapService stopping..."); - try ( - AlertRpcServer closedAlertRpcServer = alertRpcServer; - AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) { - // close resource - alertEventFetcher.shutdown(); - - alertEventLoop.shutdown(); - alertHAServer.shutdown(); - } + alertEventFetcher.shutdown(); + alertEventLoop.shutdown(); log.info("AlertBootstrapService stopped..."); } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java index 998bc655c4..67382ac470 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert.service; +import org.apache.dolphinscheduler.alert.config.AlertConfig; import org.apache.dolphinscheduler.registry.api.Registry; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer; @@ -29,8 +30,18 @@ import org.springframework.stereotype.Component; @Component public class AlertHAServer extends AbstractHAServer { - public AlertHAServer(Registry registry) { - super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath()); + public AlertHAServer(final Registry registry, final AlertConfig alertConfig) { + super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), alertConfig.getAlertServerAddress()); } + @Override + public void start() { + super.start(); + log.info("AlertHAServer started..."); + } + + @Override + public void close() { + log.info("AlertHAServer shutdown..."); + } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java index 7aaa5baa40..31a0f8d5aa 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java @@ -24,15 +24,15 @@ import lombok.Getter; @AllArgsConstructor public enum RegistryNodeType { - ALL_SERVERS("nodes", "/nodes"), MASTER("Master", "/nodes/master"), - MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"), MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"), MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"), + WORKER("Worker", "/nodes/worker"), + ALERT_SERVER("AlertServer", "/nodes/alert-server"), - ALERT_LOCK("AlertNodeLock", "/lock/alert"); + ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader"); private final String name; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java index 5dca5552b3..822b4d93f4 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java @@ -17,13 +17,12 @@ package org.apache.dolphinscheduler.registry.api.ha; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Registry; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer { private final Registry registry; - private final String serverPath; + private final String selectorPath; + + private final String serverIdentify; private ServerStatus serverStatus; private final List serverStatusChangeListeners; - public AbstractHAServer(Registry registry, String serverPath) { + public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) { this.registry = registry; - this.serverPath = serverPath; + this.selectorPath = checkNotNull(selectorPath); + this.serverIdentify = checkNotNull(serverIdentify); this.serverStatus = ServerStatus.STAND_BY; this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener()); } @Override public void start() { - registry.subscribe(serverPath, event -> { + registry.subscribe(selectorPath, event -> { if (Event.Type.REMOVE.equals(event.type())) { - if (isActive() && !participateElection()) { + if (serverIdentify.equals(event.data())) { statusChange(ServerStatus.STAND_BY); + } else { + if (participateElection()) { + statusChange(ServerStatus.ACTIVE); + } } } }); - ScheduledExecutorService electionSelectionThread = - ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread"); - electionSelectionThread.schedule(() -> { - if (isActive()) { - return; - } - if (participateElection()) { - statusChange(ServerStatus.ACTIVE); - } - }, 10, TimeUnit.SECONDS); + + if (participateElection()) { + statusChange(ServerStatus.ACTIVE); + } else { + log.info("Server {} is standby", serverIdentify); + } } @Override @@ -75,7 +77,22 @@ public abstract class AbstractHAServer implements HAServer { @Override public boolean participateElection() { - return registry.acquireLock(serverPath, 3_000); + final String electionLock = selectorPath + "-lock"; + try { + if (registry.acquireLock(electionLock)) { + if (!registry.exists(selectorPath)) { + registry.put(selectorPath, serverIdentify, true); + return true; + } + return serverIdentify.equals(registry.get(selectorPath)); + } + return false; + } catch (Exception e) { + log.error("participate election error", e); + return false; + } finally { + registry.releaseLock(electionLock); + } } @Override @@ -88,18 +105,15 @@ public abstract class AbstractHAServer implements HAServer { return serverStatus; } - @Override - public void shutdown() { - if (isActive()) { - registry.releaseLock(serverPath); - } - } - private void statusChange(ServerStatus targetStatus) { + final ServerStatus originStatus = serverStatus; + serverStatus = targetStatus; synchronized (this) { - ServerStatus originStatus = serverStatus; - serverStatus = targetStatus; - serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus)); + try { + serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus)); + } catch (Exception ex) { + log.error("Trigger ServerStatusChangeListener from {} -> {} error", originStatus, targetStatus, ex); + } } } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java index f2e332ea20..8b74e08fcc 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java @@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener implements ServerStatus @Override public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) { - log.info("The status change from {} to {}.", originStatus, currentStatus); if (originStatus == HAServer.ServerStatus.ACTIVE) { if (currentStatus == HAServer.ServerStatus.STAND_BY) { changeToStandBy(); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java index 6a79e6eb84..4fc7929cc8 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java @@ -21,7 +21,7 @@ package org.apache.dolphinscheduler.registry.api.ha; * Interface for HA server, used to select a active server from multiple servers. * In HA mode, there are multiple servers, only one server is active, others are standby. */ -public interface HAServer { +public interface HAServer extends AutoCloseable { /** * Start the server. @@ -57,7 +57,8 @@ public interface HAServer { /** * Shutdown the server, release resources. */ - void shutdown(); + @Override + void close(); enum ServerStatus { ACTIVE,