Browse Source

[Improvement] Optimize the AbstractHAServer implementation (#16810)

Co-authored-by: xiangzihao <460888207@qq.com>
dev
Wenjun Ruan 6 days ago committed by GitHub
parent
commit
3cb9330017
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 42
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 26
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
  3. 15
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java
  4. 6
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
  5. 64
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java
  6. 1
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java
  7. 5
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java

42
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -18,7 +18,11 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@ -26,6 +30,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -44,6 +49,18 @@ import org.springframework.context.annotation.Import;
@SpringBootApplication
public class AlertServer {
@Autowired
private AlertRpcServer alertRpcServer;
@Autowired
private AlertPluginManager alertPluginManager;
@Autowired
private AlertRegistryClient alertRegistryClient;
@Autowired
private AlertHAServer alertHAServer;
@Autowired
private AlertBootstrapService alertBootstrapService;
@ -58,7 +75,25 @@ public class AlertServer {
public void run() {
ServerLifeCycleManager.toRunning();
log.info("AlertServer is staring ...");
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();
alertHAServer.addServerStatusChangeListener(new AbstractServerStatusChangeListener() {
@Override
public void changeToActive() {
alertBootstrapService.start();
}
@Override
public void changeToStandBy() {
close();
}
});
alertHAServer.start();
log.info("AlertServer is started ...");
}
@ -73,7 +108,12 @@ public class AlertServer {
return;
}
log.info("AlertServer is stopping, cause: {}", cause);
alertBootstrapService.close();
try (
final AlertRpcServer ignore = alertRpcServer;
final AlertRegistryClient ignore1 = alertRegistryClient;
final AlertHAServer ignore2 = alertHAServer;
final AlertBootstrapService ignore3 = alertBootstrapService;) {
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
log.info("AlertServer stopped, cause: {}", cause);

26
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
@ -32,39 +31,21 @@ import org.springframework.stereotype.Service;
@Service
public final class AlertBootstrapService implements AutoCloseable {
private final AlertRpcServer alertRpcServer;
private final AlertRegistryClient alertRegistryClient;
private final AlertPluginManager alertPluginManager;
private final AlertHAServer alertHAServer;
private final AlertEventFetcher alertEventFetcher;
private final AlertEventLoop alertEventLoop;
public AlertBootstrapService(AlertRpcServer alertRpcServer,
AlertRegistryClient alertRegistryClient,
AlertPluginManager alertPluginManager,
AlertHAServer alertHAServer,
AlertEventFetcher alertEventFetcher,
AlertEventLoop alertEventLoop) {
this.alertRpcServer = alertRpcServer;
this.alertRegistryClient = alertRegistryClient;
this.alertPluginManager = alertPluginManager;
this.alertHAServer = alertHAServer;
this.alertEventFetcher = alertEventFetcher;
this.alertEventLoop = alertEventLoop;
}
public void start() {
log.info("AlertBootstrapService starting...");
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();
alertHAServer.start();
alertEventFetcher.start();
alertEventLoop.start();
log.info("AlertBootstrapService started...");
@ -73,15 +54,8 @@ public final class AlertBootstrapService implements AutoCloseable {
@Override
public void close() {
log.info("AlertBootstrapService stopping...");
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
alertEventFetcher.shutdown();
alertEventLoop.shutdown();
alertHAServer.shutdown();
}
log.info("AlertBootstrapService stopped...");
}
}

15
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
@ -29,8 +30,18 @@ import org.springframework.stereotype.Component;
@Component
public class AlertHAServer extends AbstractHAServer {
public AlertHAServer(Registry registry) {
super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
public AlertHAServer(final Registry registry, final AlertConfig alertConfig) {
super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), alertConfig.getAlertServerAddress());
}
@Override
public void start() {
super.start();
log.info("AlertHAServer started...");
}
@Override
public void close() {
log.info("AlertHAServer shutdown...");
}
}

6
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

@ -24,15 +24,15 @@ import lombok.Getter;
@AllArgsConstructor
public enum RegistryNodeType {
ALL_SERVERS("nodes", "/nodes"),
MASTER("Master", "/nodes/master"),
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"),
WORKER("Worker", "/nodes/worker"),
ALERT_SERVER("AlertServer", "/nodes/alert-server"),
ALERT_LOCK("AlertNodeLock", "/lock/alert");
ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");
private final String name;

64
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java

@ -17,13 +17,12 @@
package org.apache.dolphinscheduler.registry.api.ha;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {
private final Registry registry;
private final String serverPath;
private final String selectorPath;
private final String serverIdentify;
private ServerStatus serverStatus;
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
public AbstractHAServer(Registry registry, String serverPath) {
public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
this.registry = registry;
this.serverPath = serverPath;
this.selectorPath = checkNotNull(selectorPath);
this.serverIdentify = checkNotNull(serverIdentify);
this.serverStatus = ServerStatus.STAND_BY;
this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener());
}
@Override
public void start() {
registry.subscribe(serverPath, event -> {
registry.subscribe(selectorPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
if (isActive() && !participateElection()) {
if (serverIdentify.equals(event.data())) {
statusChange(ServerStatus.STAND_BY);
} else {
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}
});
ScheduledExecutorService electionSelectionThread =
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
electionSelectionThread.schedule(() -> {
if (isActive()) {
return;
}
});
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
} else {
log.info("Server {} is standby", serverIdentify);
}
}, 10, TimeUnit.SECONDS);
}
@Override
@ -75,7 +77,22 @@ public abstract class AbstractHAServer implements HAServer {
@Override
public boolean participateElection() {
return registry.acquireLock(serverPath, 3_000);
final String electionLock = selectorPath + "-lock";
try {
if (registry.acquireLock(electionLock)) {
if (!registry.exists(selectorPath)) {
registry.put(selectorPath, serverIdentify, true);
return true;
}
return serverIdentify.equals(registry.get(selectorPath));
}
return false;
} catch (Exception e) {
log.error("participate election error", e);
return false;
} finally {
registry.releaseLock(electionLock);
}
}
@Override
@ -88,18 +105,15 @@ public abstract class AbstractHAServer implements HAServer {
return serverStatus;
}
@Override
public void shutdown() {
if (isActive()) {
registry.releaseLock(serverPath);
}
}
private void statusChange(ServerStatus targetStatus) {
synchronized (this) {
ServerStatus originStatus = serverStatus;
final ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
synchronized (this) {
try {
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
} catch (Exception ex) {
log.error("Trigger ServerStatusChangeListener from {} -> {} error", originStatus, targetStatus, ex);
}
}
}
}

1
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java

@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener implements ServerStatus
@Override
public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) {
log.info("The status change from {} to {}.", originStatus, currentStatus);
if (originStatus == HAServer.ServerStatus.ACTIVE) {
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
changeToStandBy();

5
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java

@ -21,7 +21,7 @@ package org.apache.dolphinscheduler.registry.api.ha;
* Interface for HA server, used to select a active server from multiple servers.
* In HA mode, there are multiple servers, only one server is active, others are standby.
*/
public interface HAServer {
public interface HAServer extends AutoCloseable {
/**
* Start the server.
@ -57,7 +57,8 @@ public interface HAServer {
/**
* Shutdown the server, release resources.
*/
void shutdown();
@Override
void close();
enum ServerStatus {
ACTIVE,

Loading…
Cancel
Save