From 9856453cd7a9a51ce6934c8b89630f9d025e74a7 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 20 Nov 2024 16:04:11 +0800 Subject: [PATCH] [DSIP-82][Master/Worker] Use FAILOVER_FINISH_NODES to avoid duplicate workflow/task when failover (#16821) --- docs/docs/en/architecture/configuration.md | 1 - docs/docs/en/guide/upgrade/incompatible.md | 1 + docs/docs/zh/architecture/configuration.md | 47 +++-- docs/docs/zh/guide/upgrade/incompatible.md | 1 + .../master/cluster/BaseServerMetadata.java | 5 + .../master/cluster/ClusterStateMonitors.java | 12 +- .../server/master/cluster/IClusters.java | 3 + .../server/master/cluster/MasterClusters.java | 16 +- .../master/cluster/MasterServerMetadata.java | 7 +- .../server/master/cluster/WorkerClusters.java | 6 + .../master/cluster/WorkerServerMetadata.java | 3 +- .../server/master/config/MasterConfig.java | 4 - .../system/event/AbstractSystemEvent.java | 6 + .../system/event/MasterFailoverEvent.java | 24 ++- .../system/event/WorkerFailoverEvent.java | 24 ++- .../master/failover/FailoverCoordinator.java | 130 +++++++++--- .../MasterConnectionStateListener.java | 13 +- .../master/registry/MasterHeartBeatTask.java | 20 +- .../master/registry/MasterRegistryClient.java | 5 +- .../master/registry/MasterStopStrategy.java | 58 ------ .../registry/MasterWaitingStrategy.java | 116 ----------- .../src/main/resources/application.yaml | 3 - .../src/test/resources/application.yaml | 3 - .../registry/api/ConnectStrategy.java | 2 - .../api/ConnectStrategyProperties.java | 31 --- .../registry/api/RegistryClient.java | 33 ++++ .../registry/api/StrategyType.java | 25 --- .../registry/api/enums/RegistryNodeType.java | 2 + .../registry/api/utils/RegistryUtils.java | 15 +- .../server/worker/config/WorkerConfig.java | 3 - .../registry/WorkerConnectStrategy.java | 24 --- .../WorkerConnectionStateListener.java | 16 +- .../worker/registry/WorkerRegistryClient.java | 8 +- .../worker/registry/WorkerStopStrategy.java | 52 ----- .../registry/WorkerWaitingStrategy.java | 131 ------------ .../worker/task/WorkerHeartBeatTask.java | 16 +- .../src/main/resources/application.yaml | 3 - .../WorkerConnectionStateListenerTest.java | 21 +- .../registry/WorkerRegistryClientTest.java | 2 - .../worker/registry/WorkerStrategyTest.java | 187 ------------------ 40 files changed, 304 insertions(+), 775 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java delete mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java delete mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java => dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java (53%) delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java delete mode 100644 dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 86d4357e1b..fb8613d388 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -294,7 +294,6 @@ Location: `master-server/conf/application.yaml` | master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | | master.failover-interval | 10 | failover interval, the unit is minute | | master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance | -| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | | master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | | master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | | master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | diff --git a/docs/docs/en/guide/upgrade/incompatible.md b/docs/docs/en/guide/upgrade/incompatible.md index c263d97ac4..91abd04811 100644 --- a/docs/docs/en/guide/upgrade/incompatible.md +++ b/docs/docs/en/guide/upgrade/incompatible.md @@ -33,4 +33,5 @@ This document records the incompatible updates between each version. You need to * Uniformly name `process` in code as `workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515) * Deprecated upgrade code of 1.x and 2.x ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543) * Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) +* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 8eed519f78..c76e869f72 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -276,30 +276,28 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId 位置:`master-server/conf/application.yaml` -| 参数 | 默认值 | 描述 | -|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------| -| master.listen-port | 5678 | master监听端口 | -| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | -| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | -| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | -| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务 | -| master.max-heartbeat-interval | 10s | master最大心跳间隔 | -| master.task-commit-retry-times | 5 | 任务重试次数 | -| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | -| master.state-wheel-interval | 5 | 轮询检查状态时间 | -| master.server-load-protection.enabled | true | 是否开启系统保护策略 | -| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | -| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | -| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | -| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | -| master.failover-interval | 10 | failover间隔,单位为分钟 | -| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | -| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | -| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | -| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | -| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` | -| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 | -| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 | +| 参数 | 默认值 | 描述 | +|-----------------------------------------------------------------------------|------------------------------|-----------------------------------------------------------------------------------------| +| master.listen-port | 5678 | master监听端口 | +| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | +| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | +| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | +| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务 | +| master.max-heartbeat-interval | 10s | master最大心跳间隔 | +| master.task-commit-retry-times | 5 | 任务重试次数 | +| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | +| master.state-wheel-interval | 5 | 轮询检查状态时间 | +| master.server-load-protection.enabled | true | 是否开启系统保护策略 | +| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | +| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | +| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | +| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | +| master.failover-interval | 10 | failover间隔,单位为分钟 | +| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | +| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | +| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 | +| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 | ## Worker Server相关配置 @@ -319,7 +317,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId | worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | | worker.alert-listen-host | localhost | alert监听host | | worker.alert-listen-port | 50052 | alert监听端口 | -| worker.registry-disconnect-strategy.strategy | stop | 当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | | worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 | | worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务 | | worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | diff --git a/docs/docs/zh/guide/upgrade/incompatible.md b/docs/docs/zh/guide/upgrade/incompatible.md index f1fb24d9c9..4ba7d4bf95 100644 --- a/docs/docs/zh/guide/upgrade/incompatible.md +++ b/docs/docs/zh/guide/upgrade/incompatible.md @@ -31,4 +31,5 @@ * 统一代码中的 `process` 为 `workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515) * 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543) * 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) +* 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java index 002cd2903f..c9b89eee70 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java @@ -20,12 +20,17 @@ package org.apache.dolphinscheduler.server.master.cluster; import org.apache.dolphinscheduler.common.enums.ServerStatus; import lombok.Data; +import lombok.ToString; import lombok.experimental.SuperBuilder; @Data +@ToString @SuperBuilder public abstract class BaseServerMetadata implements IClusters.IServerMetadata { + // The server startup time in milliseconds. + private final long serverStartupTime; + private final String address; private final double cpuUsage; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java index bc78cb80a9..06705a9f39 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterStateMonitors.java @@ -46,12 +46,16 @@ public class ClusterStateMonitors { log.info("ClusterStateMonitors started..."); } - void masterRemoved(MasterServerMetadata masterServer) { - systemEventBus.publish(MasterFailoverEvent.of(masterServer.getAddress(), new Date())); + void masterRemoved(final MasterServerMetadata masterServer) { + // We set a delay of 30 seconds for the master failover event + // If the master can reconnect to registry within 30 seconds, the master will skip failover. + systemEventBus.publish(MasterFailoverEvent.of(masterServer, new Date(), 30_000)); } - void workerRemoved(WorkerServerMetadata workerServer) { - systemEventBus.publish(WorkerFailoverEvent.of(workerServer.getAddress(), new Date())); + void workerRemoved(final WorkerServerMetadata workerServer) { + // We set a delay of 30 seconds for the worker failover event + // If the worker can reconnect to registry within 30 seconds, the worker will skip failover. + systemEventBus.publish(WorkerFailoverEvent.of(workerServer, new Date(), 30_000)); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java index 5bcc85324e..3b97ea8529 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IClusters.java @@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.master.cluster; import org.apache.dolphinscheduler.common.enums.ServerStatus; import java.util.List; +import java.util.Optional; public interface IClusters { List getServers(); + Optional getServer(final String address); + void registerListener(IClustersChangeListener listener); interface IServerMetadata { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java index 568e8f1dc8..bb7d205c4c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterClusters.java @@ -26,6 +26,7 @@ import org.apache.commons.collections4.list.UnmodifiableList; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -50,6 +51,11 @@ public class MasterClusters extends AbstractClusterSubscribeListener(masterServerMap.values())); } + @Override + public Optional getServer(final String address) { + return Optional.ofNullable(masterServerMap.get(address)); + } + public List getNormalServers() { List normalMasterServers = masterServerMap.values() .stream() @@ -59,12 +65,12 @@ public class MasterClusters extends AbstractClusterSubscribeListener listener) { + public void registerListener(final IClustersChangeListener listener) { masterClusterChangeListeners.add(listener); } @Override - MasterServerMetadata parseServerFromHeartbeat(String masterHeartBeatJson) { + MasterServerMetadata parseServerFromHeartbeat(final String masterHeartBeatJson) { MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(masterHeartBeatJson, MasterHeartBeat.class); if (masterHeartBeat == null) { return null; @@ -73,7 +79,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener listener : masterClusterChangeListeners) { listener.onServerAdded(masterServer); @@ -81,7 +87,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener listener : masterClusterChangeListeners) { listener.onServerRemove(masterServer); @@ -89,7 +95,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener listener : masterClusterChangeListeners) { listener.onServerUpdate(masterServer); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java index 32dd0cad9e..1005de4c50 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java @@ -21,15 +21,18 @@ import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.ToString; import lombok.experimental.SuperBuilder; @Data +@ToString(callSuper = true) @SuperBuilder @EqualsAndHashCode(callSuper = true) public class MasterServerMetadata extends BaseServerMetadata implements Comparable { - public static MasterServerMetadata parseFromHeartBeat(MasterHeartBeat masterHeartBeat) { + public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) { return MasterServerMetadata.builder() + .serverStartupTime(masterHeartBeat.getStartupTime()) .address(masterHeartBeat.getHost() + ":" + masterHeartBeat.getPort()) .cpuUsage(masterHeartBeat.getCpuUsage()) .memoryUsage(masterHeartBeat.getMemoryUsage()) @@ -39,7 +42,7 @@ public class MasterServerMetadata extends BaseServerMetadata implements Comparab // Use the master address to sort the master server @Override - public int compareTo(MasterServerMetadata o) { + public int compareTo(final MasterServerMetadata o) { return this.getAddress().compareTo(o.getAddress()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java index 363dee0222..291b2f1758 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; @@ -53,6 +54,11 @@ public class WorkerClusters extends AbstractClusterSubscribeListener(workerMapping.values())); } + @Override + public Optional getServer(final String address) { + return Optional.ofNullable(workerMapping.get(address)); + } + public List getWorkerServerAddressByGroup(String workerGroup) { if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java index 61cef98a00..de9a03506d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java @@ -38,8 +38,9 @@ public class WorkerServerMetadata extends BaseServerMetadata { private final double taskThreadPoolUsage; - public static WorkerServerMetadata parseFromHeartBeat(WorkerHeartBeat workerHeartBeat) { + public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) { return WorkerServerMetadata.builder() + .serverStartupTime(workerHeartBeat.getStartupTime()) .address(workerHeartBeat.getHost() + ":" + workerHeartBeat.getPort()) .cpuUsage(workerHeartBeat.getCpuUsage()) .memoryUsage(workerHeartBeat.getMemoryUsage()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 9ff59e0cdb..bc1cfef2ec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.config; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties; @@ -60,8 +59,6 @@ public class MasterConfig implements Validator { private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection(); - private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); - private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy(); @@ -120,7 +117,6 @@ public class MasterConfig implements Validator { "\n workflow-event-bus-fire-thread-count -> " + workflowEventBusFireThreadCount + "\n max-heartbeat-interval -> " + maxHeartbeatInterval + "\n server-load-protection -> " + serverLoadProtection + - "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + "\n master-address -> " + masterAddress + "\n master-registry-path: " + masterRegistryPath + "\n worker-group-refresh-interval: " + workerGroupRefreshInterval + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java index 0df2a87cdf..17c1dfb296 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/AbstractSystemEvent.java @@ -27,8 +27,14 @@ public abstract class AbstractSystemEvent extends AbstractDelayEvent { super(delayTime); } + /** + * The event happen time. + */ public abstract Date getEventTime(); + /** + * The event type. + */ public abstract SystemEventType getEventType(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java index 9eab103f72..f2086a6569 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.engine.system.event; import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata; + import java.util.Date; import lombok.Getter; @@ -26,20 +28,23 @@ import lombok.Getter; @Getter public class MasterFailoverEvent extends AbstractSystemEvent { - private final String masterAddress; + private final MasterServerMetadata masterServerMetadata; private final Date eventTime; - private MasterFailoverEvent(final String masterAddress, - final Date eventTime) { - super(eventTime.getTime()); - this.masterAddress = masterAddress; + private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata, + final Date eventTime, + final long delayTime) { + super(delayTime); + this.masterServerMetadata = masterServerMetadata; this.eventTime = eventTime; } - public static MasterFailoverEvent of(final String masterAddress, final Date eventTime) { - checkNotNull(masterAddress); + public static MasterFailoverEvent of(final MasterServerMetadata masterServerMetadata, + final Date eventTime, + final long delayTime) { + checkNotNull(masterServerMetadata); checkNotNull(eventTime); - return new MasterFailoverEvent(masterAddress, eventTime); + return new MasterFailoverEvent(masterServerMetadata, eventTime, delayTime); } @Override @@ -50,8 +55,9 @@ public class MasterFailoverEvent extends AbstractSystemEvent { @Override public String toString() { return "MasterFailoverEvent{" + - "masterAddress='" + masterAddress + '\'' + + "masterServerMetadata='" + masterServerMetadata + '\'' + ", eventTime=" + eventTime + + ", delayTime=" + delayTime + '}'; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java index d985f0546b..4dd33a2bc2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/WorkerFailoverEvent.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.engine.system.event; import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata; + import java.util.Date; import lombok.Getter; @@ -26,20 +28,23 @@ import lombok.Getter; @Getter public class WorkerFailoverEvent extends AbstractSystemEvent { - private final String workerAddress; + private final WorkerServerMetadata workerServerMetadata; private final Date eventTime; - private WorkerFailoverEvent(final String workerAddress, - final Date eventTime) { - super(eventTime.getTime()); - this.workerAddress = workerAddress; + private WorkerFailoverEvent(final WorkerServerMetadata workerServerMetadata, + final Date eventTime, + final long delayTime) { + super(delayTime); + this.workerServerMetadata = workerServerMetadata; this.eventTime = eventTime; } - public static WorkerFailoverEvent of(final String workerAddress, final Date eventTime) { - checkNotNull(workerAddress); + public static WorkerFailoverEvent of(final WorkerServerMetadata workerServerMetadata, + final Date eventTime, + final long delayTime) { + checkNotNull(workerServerMetadata); checkNotNull(eventTime); - return new WorkerFailoverEvent(workerAddress, eventTime); + return new WorkerFailoverEvent(workerServerMetadata, eventTime, delayTime); } @Override @@ -50,8 +55,9 @@ public class WorkerFailoverEvent extends AbstractSystemEvent { @Override public String toString() { return "WorkerFailoverEvent{" + - "workerAddress='" + workerAddress + '\'' + + "workerServerMetadata='" + workerServerMetadata + '\'' + ", eventTime=" + eventTime + + ", delayTime=" + delayTime + '}'; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java index 81b1e1e151..7bb24907ca 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java @@ -22,6 +22,10 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils; +import org.apache.dolphinscheduler.server.master.cluster.ClusterManager; +import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata; +import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata; import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent; import org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent; @@ -29,11 +33,11 @@ import org.apache.dolphinscheduler.server.master.engine.system.event.WorkerFailo import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.time.StopWatch; import java.util.Date; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -49,6 +53,9 @@ public class FailoverCoordinator implements IFailoverCoordinator { @Autowired private RegistryClient registryClient; + @Autowired + private ClusterManager clusterManager; + @Autowired private IWorkflowRepository workflowRepository; @@ -65,17 +72,23 @@ public class FailoverCoordinator implements IFailoverCoordinator { private WorkflowFailover workflowFailover; @Override - public void globalMasterFailover(GlobalMasterFailoverEvent globalMasterFailoverEvent) { + public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFailoverEvent) { final StopWatch failoverTimeCost = StopWatch.createStarted(); log.info("Global master failover starting"); - final List masterFailoverEvents = workflowInstanceDao.queryNeedFailoverMasters() - .stream() - .map(masterAddress -> MasterFailoverEvent.of(masterAddress, globalMasterFailoverEvent.getEventTime())) - .collect(Collectors.toList()); - - if (CollectionUtils.isNotEmpty(masterFailoverEvents)) { - log.info("There are {} masters need to failover", masterFailoverEvents.size()); - masterFailoverEvents.forEach(this::failoverMaster); + final List masterAddressWhichContainsUnFinishedWorkflow = + workflowInstanceDao.queryNeedFailoverMasters(); + for (final String masterAddress : masterAddressWhichContainsUnFinishedWorkflow) { + final Optional aliveMasterOptional = + clusterManager.getMasterClusters().getServer(masterAddress); + if (aliveMasterOptional.isPresent()) { + final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get(); + log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata); + doMasterFailover(aliveMasterServerMetadata.getAddress(), + aliveMasterServerMetadata.getServerStartupTime()); + } else { + log.info("The master[{}] is not alive, do global master failover on it", masterAddress); + doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime()); + } } failoverTimeCost.stop(); @@ -84,16 +97,55 @@ public class FailoverCoordinator implements IFailoverCoordinator { @Override public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) { - final StopWatch failoverTimeCost = StopWatch.createStarted(); - final String masterAddress = masterFailoverEvent.getMasterAddress(); - log.info("Master[{}] failover starting", masterAddress); + final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata(); + log.info("Master[{}] failover starting", masterServerMetadata); + + final Optional aliveMasterOptional = + clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress()); + if (aliveMasterOptional.isPresent()) { + final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get(); + if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) { + log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata); + } else { + log.info("The master[{}] is alive, but the startup time is different, will failover on {}", + masterServerMetadata, + aliveMasterServerMetadata); + doMasterFailover(aliveMasterServerMetadata.getAddress(), + aliveMasterServerMetadata.getServerStartupTime()); + } + } else { + log.info("The master[{}] is not alive, will failover", masterServerMetadata); + doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime()); + } + } + /** + * Do master failover. + *

Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime. + */ + private void doMasterFailover(final String masterAddress, final long masterStartupTime) { + // We use lock to avoid multiple master failover at the same time. + // Once the workflow has been failovered, then it's state will be changed to FAILOVER + // Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new + // start time. + // So if a master has been failovered multiple times, there is no problem. + final StopWatch failoverTimeCost = StopWatch.createStarted(); registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath()); try { - final List needFailoverWorkflows = getFailoverWorkflowsForMaster(masterFailoverEvent); + final String failoverFinishedNodePath = + RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime); + if (registryClient.exists(failoverFinishedNodePath)) { + log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover", + masterAddress, + masterStartupTime, + failoverFinishedNodePath); + return; + } + final List needFailoverWorkflows = + getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime)); needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow); - failoverTimeCost.stop(); + registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis())); log.info("Master[{}] failover {} workflows finished, cost: {}/ms", masterAddress, needFailoverWorkflows.size(), @@ -103,10 +155,11 @@ public class FailoverCoordinator implements IFailoverCoordinator { } } - private List getFailoverWorkflowsForMaster(final MasterFailoverEvent masterFailoverEvent) { + private List getFailoverWorkflowsForMaster(final String masterAddress, + final Date masterCrashTime) { // todo: use page query - final List workflowInstances = workflowInstanceDao.queryNeedFailoverWorkflowInstances( - masterFailoverEvent.getMasterAddress()); + final List workflowInstances = + workflowInstanceDao.queryNeedFailoverWorkflowInstances(masterAddress); return workflowInstances.stream() .filter(workflowInstance -> { @@ -117,25 +170,49 @@ public class FailoverCoordinator implements IFailoverCoordinator { // todo: If the first time run workflow have the restartTime, then we can only check this final Date restartTime = workflowInstance.getRestartTime(); if (restartTime != null) { - return restartTime.before(masterFailoverEvent.getEventTime()); + return restartTime.before(masterCrashTime); } final Date startTime = workflowInstance.getStartTime(); - return startTime.before(masterFailoverEvent.getEventTime()); + return startTime.before(masterCrashTime); }) .collect(Collectors.toList()); } @Override public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) { - final StopWatch failoverTimeCost = StopWatch.createStarted(); + final WorkerServerMetadata workerServerMetadata = workerFailoverEvent.getWorkerServerMetadata(); + log.info("Worker[{}] failover starting", workerServerMetadata); + + final Optional aliveWorkerOptional = + clusterManager.getWorkerClusters().getServer(workerServerMetadata.getAddress()); + if (aliveWorkerOptional.isPresent()) { + final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get(); + if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) { + log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata); + } else { + log.info("The worker[{}] is alive, but the startup time is different, will failover on {}", + workerServerMetadata, + aliveWorkerServerMetadata); + doWorkerFailover(aliveWorkerServerMetadata.getAddress(), + aliveWorkerServerMetadata.getServerStartupTime()); + } + } else { + log.info("The worker[{}] is not alive, will failover", workerServerMetadata); + doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime()); + } + } - final String workerAddress = workerFailoverEvent.getWorkerAddress(); - log.info("Worker[{}] failover starting", workerAddress); + private void doWorkerFailover(final String workerAddress, final long workerCrashTime) { + final StopWatch failoverTimeCost = StopWatch.createStarted(); - final List needFailoverTasks = getFailoverTaskForWorker(workerFailoverEvent); + final List needFailoverTasks = + getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime)); needFailoverTasks.forEach(taskFailover::failoverTask); + registryClient.persist( + RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime), + String.valueOf(System.currentTimeMillis())); failoverTimeCost.stop(); log.info("Worker[{}] failover {} tasks finished, cost: {}/ms", workerAddress, @@ -143,9 +220,8 @@ public class FailoverCoordinator implements IFailoverCoordinator { failoverTimeCost.getTime()); } - private List getFailoverTaskForWorker(final WorkerFailoverEvent workerFailoverEvent) { - final String workerAddress = workerFailoverEvent.getWorkerAddress(); - final Date workerCrashTime = workerFailoverEvent.getEventTime(); + private List getFailoverTaskForWorker(final String workerAddress, + final Date workerCrashTime) { return workflowRepository.getAll() .stream() .map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java index 0a141b69e8..684482b7d2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java @@ -20,17 +20,17 @@ package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; +import org.apache.dolphinscheduler.registry.api.RegistryClient; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @Slf4j public class MasterConnectionStateListener implements ConnectionListener { - private final MasterConnectStrategy masterConnectStrategy; + private final RegistryClient registryClient; - public MasterConnectionStateListener(@NonNull MasterConnectStrategy masterConnectStrategy) { - this.masterConnectStrategy = masterConnectStrategy; + public MasterConnectionStateListener(final RegistryClient registryClient) { + this.registryClient = registryClient; } @Override @@ -43,12 +43,13 @@ public class MasterConnectionStateListener implements ConnectionListener { case SUSPENDED: break; case RECONNECTED: - masterConnectStrategy.reconnect(); + log.warn("Master reconnect to registry"); break; case DISCONNECTED: - masterConnectStrategy.disconnect(); + registryClient.getStoppable().stop("Master disconnected from registry, will stop myself"); break; default: + log.warn("Unknown connection state: {}", state); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java index 23cf638208..88306bfa6c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; @@ -78,16 +79,27 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { } @Override - public void writeHeartBeat(MasterHeartBeat masterHeartBeat) { + public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) { + final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat); + if (registryClient.exists(failoverNodePath)) { + log.warn("The master: {} is under {}, means it has been failover will close myself", + masterHeartBeat, + failoverNodePath); + registryClient + .getStoppable() + .stop("The master exist: " + failoverNodePath + ", means it has been failover will close myself"); + return; + } String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat); registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson); MasterServerMetrics.incMasterHeartbeatCount(); log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", - heartBeatPath, masterHeartBeatJson); + heartBeatPath, + masterHeartBeatJson); } - private ServerStatus getServerStatus(SystemMetrics systemMetrics, - MasterServerLoadProtection masterServerLoadProtection) { + private ServerStatus getServerStatus(final SystemMetrics systemMetrics, + final MasterServerLoadProtection masterServerLoadProtection) { return masterServerLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index ea7da82e2b..94e2dc0513 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -53,9 +53,6 @@ public class MasterRegistryClient implements AutoCloseable { @Autowired private MetricsProvider metricsProvider; - @Autowired - private MasterConnectStrategy masterConnectStrategy; - private MasterHeartBeatTask masterHeartBeatTask; public void start() { @@ -63,7 +60,7 @@ public class MasterRegistryClient implements AutoCloseable { this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient); // master registry registry(); - registryClient.addConnectionStateListener(new MasterConnectionStateListener(masterConnectStrategy)); + registryClient.addConnectionStateListener(new MasterConnectionStateListener(registryClient)); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java deleted file mode 100644 index bbc38d35fe..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.registry; - -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.registry.api.StrategyType; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -/** - * This strategy will stop the master server, when disconnected from {@link org.apache.dolphinscheduler.registry.api.Registry}. - */ -@Service -@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true) -@Slf4j -public class MasterStopStrategy implements MasterConnectStrategy { - - @Autowired - private RegistryClient registryClient; - @Autowired - private MasterConfig masterConfig; - - @Override - public void disconnect() { - registryClient.getStoppable() - .stop("Master disconnected from registry, will stop myself due to the stop strategy"); - } - - @Override - public void reconnect() { - log.warn("The current connect strategy is stop, so the master will not reconnect to registry"); - } - - @Override - public StrategyType getStrategyType() { - return StrategyType.STOP; - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java deleted file mode 100644 index a18e823e09..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.registry; - -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.common.lifecycle.ServerStatus; -import org.apache.dolphinscheduler.registry.api.Registry; -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.registry.api.RegistryException; -import org.apache.dolphinscheduler.registry.api.StrategyType; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; - -import java.time.Duration; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -/** - * This strategy will change the server status to {@link ServerStatus#WAITING} when disconnect from {@link Registry}. - */ -@Service -@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting") -@Slf4j -public class MasterWaitingStrategy implements MasterConnectStrategy { - - @Autowired - private MasterConfig masterConfig; - @Autowired - private RegistryClient registryClient; - @Autowired - private IWorkflowRepository IWorkflowRepository; - - @Override - public void disconnect() { - try { - ServerLifeCycleManager.toWaiting(); - clearMasterResource(); - Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime(); - try { - log.info("Master disconnect from registry will try to reconnect in {} s", - maxWaitingTime.getSeconds()); - registryClient.connectUntilTimeout(maxWaitingTime); - } catch (RegistryException ex) { - throw new ServerLifeCycleException( - String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex); - } - } catch (ServerLifeCycleException e) { - String errorMessage = String.format( - "Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server", - ServerLifeCycleManager.getServerStatus()); - log.error(errorMessage, e); - registryClient.getStoppable().stop(errorMessage); - } catch (RegistryException ex) { - String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server"; - log.error(errorMessage, ex); - registryClient.getStoppable().stop(errorMessage); - } catch (Exception ex) { - String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server"; - log.error(errorMessage, ex); - registryClient.getStoppable().stop(errorMessage); - } - } - - @Override - public void reconnect() { - if (ServerLifeCycleManager.isRunning()) { - log.info("no need to reconnect, as the current server status is running"); - } else { - try { - ServerLifeCycleManager.recoverFromWaiting(); - log.info("Recover from waiting success, the current server status is {}", - ServerLifeCycleManager.getServerStatus()); - } catch (Exception e) { - String errorMessage = - String.format( - "Recover from waiting failed, the current server status is %s, will stop the server", - ServerLifeCycleManager.getServerStatus()); - log.error(errorMessage, e); - registryClient.getStoppable().stop(errorMessage); - } - } - } - - @Override - public StrategyType getStrategyType() { - return StrategyType.WAITING; - } - - private void clearMasterResource() { - log.warn("Master clear workflow event queue due to lost registry connection"); - IWorkflowRepository.clear(); - log.warn("Master clear workflow instance cache due to lost registry connection"); - - } - -} diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 964fca836c..5521105f44 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -102,9 +102,6 @@ master: max-system-memory-usage-percentage-thresholds: 0.7 # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. max-disk-usage-percentage-thresholds: 0.7 - registry-disconnect-strategy: - # The disconnect strategy: stop, waiting - strategy: stop worker-group-refresh-interval: 10s command-fetch-strategy: type: ID_SLOT_BASED diff --git a/dolphinscheduler-master/src/test/resources/application.yaml b/dolphinscheduler-master/src/test/resources/application.yaml index 8ab6e7de0b..7f8f1594a6 100644 --- a/dolphinscheduler-master/src/test/resources/application.yaml +++ b/dolphinscheduler-master/src/test/resources/application.yaml @@ -67,9 +67,6 @@ master: memory-usage-weight: 40 cpu-usage-weight: 30 task-thread-pool-usage-weight: 30 - registry-disconnect-strategy: - # The disconnect strategy: stop, waiting - strategy: stop worker-group-refresh-interval: 10s command-fetch-strategy: type: ID_SLOT_BASED diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java index 448a46841e..4617a4869f 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java @@ -26,6 +26,4 @@ public interface ConnectStrategy { void reconnect(); - StrategyType getStrategyType(); - } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java deleted file mode 100644 index 5457a9e44a..0000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.registry.api; - -import java.time.Duration; - -import lombok.Data; - -@Data -public class ConnectStrategyProperties { - - private StrategyType strategy = StrategyType.STOP; - - private Duration maxWaitingTime = Duration.ofSeconds(0); - -} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java index 3ddce0eee8..df0587019a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -67,6 +68,10 @@ public class RegistryClient { if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) { registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false); } + if (!registry.exists(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath())) { + registry.put(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath(), EMPTY, false); + } + cleanHistoryFailoverFinishedNodes(); } public boolean isConnected() { @@ -168,6 +173,11 @@ public class RegistryClient { registry.put(key, value, true); } + public void persist(String key, String value) { + log.info("persist key: {}, value: {}", key, value); + registry.put(key, value, false); + } + public void remove(String key) { registry.delete(key); } @@ -222,4 +232,27 @@ public class RegistryClient { private Collection getServerNodes(RegistryNodeType nodeType) { return getChildrenKeys(nodeType.getRegistryPath()); } + + private void cleanHistoryFailoverFinishedNodes() { + // Clean the history failover finished nodes + // which failover is before the current time minus 1 week + final Collection failoverFinishedNodes = + registry.children(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath()); + if (CollectionUtils.isEmpty(failoverFinishedNodes)) { + return; + } + for (final String failoverFinishedNode : failoverFinishedNodes) { + try { + final String failoverFinishTime = registry.get(failoverFinishedNode); + if (System.currentTimeMillis() - Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) { + registry.delete(failoverFinishedNode); + log.info( + "Clear the failover finished node: {} which failover time is before the current time minus 1 week", + failoverFinishedNode); + } + } catch (Exception ex) { + log.error("Failed to clean the failoverFinishedNode: {}", failoverFinishedNode, ex); + } + } + } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java deleted file mode 100644 index 214177ec44..0000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.registry.api; - -public enum StrategyType { - - STOP, - WAITING, - ; -} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java index 31a0f8d5aa..d39e95a84f 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java @@ -24,6 +24,8 @@ import lombok.Getter; @AllArgsConstructor public enum RegistryNodeType { + FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"), + MASTER("Master", "/nodes/master"), MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java similarity index 53% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java rename to dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java index 4cecadce16..25ef976ed5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java @@ -15,10 +15,19 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.registry; +package org.apache.dolphinscheduler.registry.api.utils; -import org.apache.dolphinscheduler.registry.api.ConnectStrategy; +import org.apache.dolphinscheduler.common.model.BaseHeartBeat; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; -public interface MasterConnectStrategy extends ConnectStrategy { +public class RegistryUtils { + public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) { + return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(), + baseHeartBeat.getStartupTime()); + } + + public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) { + return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime; + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index d901effbd1..e391916a09 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.config; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.commons.lang3.StringUtils; @@ -46,7 +45,6 @@ public class WorkerConfig implements Validator { private Duration maxHeartbeatInterval = Duration.ofSeconds(10); private int hostWeight = 100; private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection(); - private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); /** * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort @@ -90,7 +88,6 @@ public class WorkerConfig implements Validator { "\n host-weight -> " + hostWeight + "\n tenantConfig -> " + tenantConfig + "\n server-load-protection -> " + serverLoadProtection + - "\n registry-disconnect-strategy -> " + registryDisconnectStrategy + "\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy + "\n address -> " + workerAddress + "\n registry-path: " + workerRegistryPath + diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java deleted file mode 100644 index 260b7e0390..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.registry; - -import org.apache.dolphinscheduler.registry.api.ConnectStrategy; - -public interface WorkerConnectStrategy extends ConnectStrategy { - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java index d070a90b34..af046b2f57 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java @@ -20,21 +20,17 @@ package org.apache.dolphinscheduler.server.worker.registry; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.registry.api.RegistryClient; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @Slf4j public class WorkerConnectionStateListener implements ConnectionListener { - private final WorkerConfig workerConfig; - private final WorkerConnectStrategy workerConnectStrategy; + private final RegistryClient registryClient; - public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig, - @NonNull WorkerConnectStrategy workerConnectStrategy) { - this.workerConfig = workerConfig; - this.workerConnectStrategy = workerConnectStrategy; + public WorkerConnectionStateListener(final RegistryClient registryClient) { + this.registryClient = registryClient; } @Override @@ -47,10 +43,10 @@ public class WorkerConnectionStateListener implements ConnectionListener { case SUSPENDED: break; case RECONNECTED: - workerConnectStrategy.reconnect(); + log.warn("Worker reconnect to registry"); break; case DISCONNECTED: - workerConnectStrategy.disconnect(); + registryClient.getStoppable().stop("Worker disconnected from registry, will stop myself"); default: } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 3fca5caee1..dd0375caf8 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -46,7 +46,6 @@ import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @Slf4j @@ -62,10 +61,6 @@ public class WorkerRegistryClient implements AutoCloseable { @Autowired private RegistryClient registryClient; - @Autowired - @Lazy - private WorkerConnectStrategy workerConnectStrategy; - @Autowired private MetricsProvider metricsProvider; @@ -83,8 +78,7 @@ public class WorkerRegistryClient implements AutoCloseable { public void start() { try { registry(); - registryClient.addConnectionStateListener( - new WorkerConnectionStateListener(workerConfig, workerConnectStrategy)); + registryClient.addConnectionStateListener(new WorkerConnectionStateListener(registryClient)); } catch (Exception ex) { throw new RegistryException("Worker registry client start up error", ex); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java deleted file mode 100644 index 4c575a29e8..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.registry; - -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.registry.api.StrategyType; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -@Service -@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true) -@Slf4j -public class WorkerStopStrategy implements WorkerConnectStrategy { - - @Autowired - public RegistryClient registryClient; - - @Override - public void disconnect() { - registryClient.getStoppable() - .stop("Worker disconnected from registry, will stop myself due to the stop strategy"); - } - - @Override - public void reconnect() { - log.warn("The current connect strategy is stop, so the worker will not reconnect to registry"); - } - - @Override - public StrategyType getStrategyType() { - return StrategyType.STOP; - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java deleted file mode 100644 index 2be156b5df..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.registry; - -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.registry.api.RegistryException; -import org.apache.dolphinscheduler.registry.api.StrategyType; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; -import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; -import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; - -import java.time.Duration; - -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; - -@Service -@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "waiting") -@Slf4j -public class WorkerWaitingStrategy implements WorkerConnectStrategy { - - @Autowired - private WorkerConfig workerConfig; - - @Autowired - private RegistryClient registryClient; - - @Autowired - private MessageRetryRunner messageRetryRunner; - - @Autowired - private WorkerTaskExecutorThreadPool workerManagerThread; - - public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig, - @NonNull RegistryClient registryClient, - @NonNull MessageRetryRunner messageRetryRunner, - @NonNull WorkerTaskExecutorThreadPool workerManagerThread) { - this.workerConfig = workerConfig; - this.registryClient = registryClient; - this.messageRetryRunner = messageRetryRunner; - this.workerManagerThread = workerManagerThread; - } - - @Override - public void disconnect() { - try { - ServerLifeCycleManager.toWaiting(); - clearWorkerResource(); - Duration maxWaitingTime = workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime(); - try { - log.info("Worker disconnect from registry will try to reconnect in {} s", - maxWaitingTime.getSeconds()); - registryClient.connectUntilTimeout(maxWaitingTime); - } catch (RegistryException ex) { - throw new ServerLifeCycleException( - String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex); - } - - } catch (ServerLifeCycleException e) { - String errorMessage = String.format( - "Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server", - ServerLifeCycleManager.getServerStatus()); - log.error(errorMessage, e); - registryClient.getStoppable().stop(errorMessage); - } catch (RegistryException ex) { - String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server"; - log.error(errorMessage, ex); - registryClient.getStoppable().stop(errorMessage); - } catch (Exception ex) { - String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server"; - log.error(errorMessage, ex); - registryClient.getStoppable().stop(errorMessage); - } - } - - @Override - public void reconnect() { - if (ServerLifeCycleManager.isRunning()) { - log.info("no need to reconnect, as the current server status is running"); - } else { - try { - ServerLifeCycleManager.recoverFromWaiting(); - log.info("Recover from waiting success, the current server status is {}", - ServerLifeCycleManager.getServerStatus()); - } catch (Exception e) { - String errorMessage = - String.format( - "Recover from waiting failed, the current server status is %s, will stop the server", - ServerLifeCycleManager.getServerStatus()); - log.error(errorMessage, e); - registryClient.getStoppable().stop(errorMessage); - } - } - } - - @Override - public StrategyType getStrategyType() { - return StrategyType.WAITING; - } - - private void clearWorkerResource() { - workerManagerThread.clearTask(); - WorkerTaskExecutorHolder.clear(); - log.warn("Worker server clear the tasks due to lost connection from registry"); - messageRetryRunner.clearMessage(); - log.warn("Worker server clear the retry message due to lost connection from registry"); - } - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java index 13f585d92a..a2e3f37825 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; @@ -81,14 +82,25 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { } @Override - public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) { + public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) { + final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat); + if (registryClient.exists(failoverNodePath)) { + log.warn("The worker: {} is under {}, means it has been failover will close myself", + workerHeartBeat, + failoverNodePath); + registryClient + .getStoppable() + .stop("The worker exist: " + failoverNodePath + ", means it has been failover will close myself"); + return; + } String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat); String workerRegistryPath = workerConfig.getWorkerRegistryPath(); registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson); WorkerServerMetrics.incWorkerHeartbeatCount(); log.debug( "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}", - workerRegistryPath, workerHeartBeatJson); + workerRegistryPath, + workerHeartBeatJson); } private ServerStatus getServerStatus(SystemMetrics systemMetrics, diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index 5cac4c29e5..c4077e623d 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -59,9 +59,6 @@ worker: max-system-memory-usage-percentage-thresholds: 0.7 # Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. max-disk-usage-percentage-thresholds: 0.7 - registry-disconnect-strategy: - # The disconnect strategy: stop, waiting - strategy: stop task-execute-threads-full-policy: REJECT tenant-config: # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java index 14d75779b9..c04ec14430 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.mockito.Mockito.times; import org.apache.dolphinscheduler.registry.api.ConnectionState; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,8 +28,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * worker registry test @@ -37,24 +35,23 @@ import org.slf4j.LoggerFactory; @ExtendWith(MockitoExtension.class) public class WorkerConnectionStateListenerTest { - private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class); @InjectMocks private WorkerConnectionStateListener workerConnectionStateListener; + @Mock - private WorkerConfig workerConfig; - @Mock - private WorkerConnectStrategy workerConnectStrategy; + private RegistryClient registryClient; @Test public void testWorkerConnectionStateListener() { - workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED); + Mockito.when(registryClient.getStoppable()).thenReturn(cause -> { + // do nothing + }); + workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED); workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED); - Mockito.verify(workerConnectStrategy, times(1)).reconnect(); - workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED); - + Mockito.verify(registryClient, times(0)).getStoppable(); workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED); - Mockito.verify(workerConnectStrategy, times(1)).disconnect(); + Mockito.verify(registryClient, times(1)).getStoppable(); } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index 8c7071cf60..54f9033e4e 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -63,8 +63,6 @@ public class WorkerRegistryClientTest { @Mock private WorkerTaskExecutorThreadPool workerManagerThread; @Mock - private WorkerConnectStrategy workerConnectStrategy; - @Mock private IStoppable stoppable; @Test diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java deleted file mode 100644 index 671fac5277..0000000000 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.registry; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doNothing; - -import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; -import org.apache.dolphinscheduler.registry.api.RegistryClient; -import org.apache.dolphinscheduler.registry.api.RegistryException; -import org.apache.dolphinscheduler.registry.api.StrategyType; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; -import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; - -import java.time.Duration; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * worker registry test - */ -@ExtendWith(MockitoExtension.class) -public class WorkerStrategyTest { - - private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class); - @Mock - private RegistryClient registryClient; - @Mock - private IStoppable stoppable; - @Mock - private WorkerConfig workerConfig; - @Mock - private WorkerRpcServer workerRpcServer; - @Mock - private MessageRetryRunner messageRetryRunner; - @Mock - private WorkerTaskExecutorThreadPool workerManagerThread; - @Mock - private ConnectStrategyProperties connectStrategyProperties; - - @Test - public void testWorkerStopStrategy() { - given(registryClient.getStoppable()) - .willReturn(stoppable); - WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy(); - workerStopStrategy.registryClient = registryClient; - workerStopStrategy.reconnect(); - workerStopStrategy.disconnect(); - Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP); - } - - @Test - public void testWorkerWaitingStrategyreconnect() { - WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy( - workerConfig, - registryClient, - messageRetryRunner, - workerManagerThread); - Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING); - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.isRunning()) - .thenReturn(true); - workerWaitingStrategy.reconnect(); - - } - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - doNothing().when(stoppable).stop(anyString()); - given(registryClient.getStoppable()) - .willReturn(stoppable); - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.recoverFromWaiting()) - .thenThrow(new ServerLifeCycleException("")); - workerWaitingStrategy.reconnect(); - } - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.recoverFromWaiting()) - .thenAnswer(invocation -> null); - workerWaitingStrategy.reconnect(); - } - } - - @Test - public void testWorkerWaitingStrategydisconnect() { - WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy( - workerConfig, - registryClient, - messageRetryRunner, - workerManagerThread); - Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING); - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - doNothing().when(stoppable).stop(anyString()); - given(registryClient.getStoppable()) - .willReturn(stoppable); - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.toWaiting()) - .thenThrow(new ServerLifeCycleException("")); - workerWaitingStrategy.disconnect(); - } - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1)); - given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties); - Mockito.reset(registryClient); - doNothing().when(registryClient).connectUntilTimeout(any()); - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.toWaiting()) - .thenAnswer(invocation -> null); - workerWaitingStrategy.disconnect(); - } - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1)); - given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties); - Mockito.reset(registryClient); - doNothing().when(stoppable).stop(anyString()); - given(registryClient.getStoppable()) - .willReturn(stoppable); - Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any()); - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.toWaiting()) - .thenAnswer(invocation -> null); - workerWaitingStrategy.disconnect(); - } - - try ( - MockedStatic serverLifeCycleManagerMockedStatic = - Mockito.mockStatic(ServerLifeCycleManager.class)) { - Mockito.reset(workerConfig); - given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException("")); - doNothing().when(stoppable).stop(anyString()); - given(registryClient.getStoppable()) - .willReturn(stoppable); - serverLifeCycleManagerMockedStatic - .when(() -> ServerLifeCycleManager.toWaiting()) - .thenAnswer(invocation -> null); - workerWaitingStrategy.disconnect(); - } - } -}