diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java index 54cb12e077..bb49133351 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java @@ -51,6 +51,7 @@ public class ClusterManager { this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters); this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters); this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters); + this.workerGroupChangeNotifier.start(); log.info("ClusterManager started..."); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index 6875f115b4..41c5fe0f9d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.cluster; import org.apache.dolphinscheduler.common.utils.MapComparator; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory; import org.apache.commons.collections4.CollectionUtils; -import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -43,7 +44,8 @@ import org.springframework.stereotype.Component; @Component public class WorkerGroupChangeNotifier { - private static final long DEFAULT_REFRESH_WORKER_INTERVAL = Duration.ofMinutes(1).toMillis(); + @Autowired + private MasterConfig masterConfig; private final WorkerGroupDao workerGroupDao; private final List listeners = new CopyOnWriteArrayList<>(); @@ -52,11 +54,15 @@ public class WorkerGroupChangeNotifier { public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) { this.workerGroupDao = workerGroupDao; + } + + public void start() { detectWorkerGroupChanges(); + final long workerGroupRefreshIntervalSeconds = masterConfig.getWorkerGroupRefreshInterval().getSeconds(); MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( this::detectWorkerGroupChanges, - DEFAULT_REFRESH_WORKER_INTERVAL, - DEFAULT_REFRESH_WORKER_INTERVAL, + workerGroupRefreshIntervalSeconds, + workerGroupRefreshIntervalSeconds, TimeUnit.SECONDS); }