From d07a2edae0e7ef1b76fe32d0bc4d6e3d8eab7c52 Mon Sep 17 00:00:00 2001 From: lile <38578667+reele@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:49:55 +0800 Subject: [PATCH] [Fix-16793] WorkerGroupChangeNotifier may can not detect cluster's first time change (#16796) --- .../server/master/cluster/ClusterManager.java | 1 + .../master/cluster/WorkerGroupChangeNotifier.java | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java index 54cb12e077..bb49133351 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java @@ -51,6 +51,7 @@ public class ClusterManager { this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters); this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters); this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters); + this.workerGroupChangeNotifier.start(); log.info("ClusterManager started..."); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index 6875f115b4..41c5fe0f9d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.cluster; import org.apache.dolphinscheduler.common.utils.MapComparator; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory; import org.apache.commons.collections4.CollectionUtils; -import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -43,7 +44,8 @@ import org.springframework.stereotype.Component; @Component public class WorkerGroupChangeNotifier { - private static final long DEFAULT_REFRESH_WORKER_INTERVAL = Duration.ofMinutes(1).toMillis(); + @Autowired + private MasterConfig masterConfig; private final WorkerGroupDao workerGroupDao; private final List listeners = new CopyOnWriteArrayList<>(); @@ -52,11 +54,15 @@ public class WorkerGroupChangeNotifier { public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) { this.workerGroupDao = workerGroupDao; + } + + public void start() { detectWorkerGroupChanges(); + final long workerGroupRefreshIntervalSeconds = masterConfig.getWorkerGroupRefreshInterval().getSeconds(); MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( this::detectWorkerGroupChanges, - DEFAULT_REFRESH_WORKER_INTERVAL, - DEFAULT_REFRESH_WORKER_INTERVAL, + workerGroupRefreshIntervalSeconds, + workerGroupRefreshIntervalSeconds, TimeUnit.SECONDS); }