diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java index bec0f759cd..d28cd3db08 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java @@ -184,6 +184,7 @@ public class HeartBeat { * update server state */ public void updateServerState() { + this.reportTime = System.currentTimeMillis(); if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java index 64b0b13d11..e84666a416 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java +++ b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java @@ -148,14 +148,12 @@ public class ZookeeperRegistry implements Registry { String dataPath = null; switch (type) { case NODE_ADDED: - dataPath = event.getData().getPath(); eventType = DataChangeEvent.ADD; break; case NODE_UPDATED: eventType = DataChangeEvent.UPDATE; dataPath = event.getData().getPath(); - break; case NODE_REMOVED: eventType = DataChangeEvent.REMOVE; @@ -164,7 +162,7 @@ public class ZookeeperRegistry implements Registry { default: } if (null != eventType && null != dataPath) { - ListenerManager.dataChange(path, dataPath, eventType); + ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType); } }; treeCache.getListenable().addListener(treeCacheListener); diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java index 900c7e4173..a5dc33b12e 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java +++ b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java @@ -115,7 +115,7 @@ public class ZookeeperRegistryTest { class TestListener implements SubscribeListener { @Override - public void notify(String path, DataChangeEvent dataChangeEvent) { + public void notify(String path, String data, DataChangeEvent dataChangeEvent) { logger.info("I'm test listener"); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index f78b95736a..64a15b5b95 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -78,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager { this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); - this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 5, TimeUnit.SECONDS); + this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS); } @PreDestroy diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index c712ac096d..4fd50a01bc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -39,9 +39,8 @@ public class MasterRegistryDataListener implements SubscribeListener { masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class); } - @Override - public void notify(String path, DataChangeEvent event) { + public void notify(String path, String data, DataChangeEvent event) { //monitor master if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) { handleMasterEvent(event, path); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 09f4cc2433..1bceeb7aad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -190,7 +190,7 @@ public class ServerNodeManager implements InitializingBean { public void run() { // sync worker node info Map newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true); - syncWorkerNodeInfo(newWorkerNodeInfo); + syncAllWorkerNodeInfo(newWorkerNodeInfo); // sync worker group nodes from database List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); @@ -218,7 +218,7 @@ public class ServerNodeManager implements InitializingBean { class WorkerDataListener implements SubscribeListener { @Override - public void notify(String path, DataChangeEvent dataChangeEvent) { + public void notify(String path, String data, DataChangeEvent dataChangeEvent) { if (registryClient.isWorkerPath(path)) { try { if (dataChangeEvent == DataChangeEvent.ADD) { @@ -233,6 +233,14 @@ public class ServerNodeManager implements InitializingBean { Set currentNodes = registryClient.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); alertDao.sendServerStopedAlert(1, path, "WORKER"); + } else if (dataChangeEvent == DataChangeEvent.UPDATE) { + logger.debug("worker group node : {} update, data: {}", path, data); + String group = parseGroup(path); + Set currentNodes = registryClient.getWorkerGroupNodesDirectly(group); + syncWorkerGroupNodes(group, currentNodes); + + String node = parseNode(path); + syncSingleWorkerNodeInfo(node, data); } } catch (IllegalArgumentException ex) { logger.warn(ex.getMessage()); @@ -251,6 +259,13 @@ public class ServerNodeManager implements InitializingBean { return parts[parts.length - 2]; } + private String parseNode(String path) { + String[] parts = path.split("/"); + if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { + throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); + } + return parts[parts.length - 1]; + } } /** @@ -258,7 +273,7 @@ public class ServerNodeManager implements InitializingBean { */ class MasterDataListener implements SubscribeListener { @Override - public void notify(String path, DataChangeEvent dataChangeEvent) { + public void notify(String path, String data, DataChangeEvent dataChangeEvent) { if (registryClient.isMasterPath(path)) { try { if (dataChangeEvent.equals(DataChangeEvent.ADD)) { @@ -407,7 +422,7 @@ public class ServerNodeManager implements InitializingBean { * * @param newWorkerNodeInfo new worker node info */ - private void syncWorkerNodeInfo(Map newWorkerNodeInfo) { + private void syncAllWorkerNodeInfo(Map newWorkerNodeInfo) { workerNodeInfoLock.lock(); try { workerNodeInfo.clear(); @@ -417,6 +432,18 @@ public class ServerNodeManager implements InitializingBean { } } + /** + * sync single worker node info + */ + private void syncSingleWorkerNodeInfo(String node, String info) { + workerNodeInfoLock.lock(); + try { + workerNodeInfo.put(node, info); + } finally { + workerNodeInfoLock.unlock(); + } + } + /** * destroy */ diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java index ee134058f0..94b13e6656 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java @@ -55,12 +55,12 @@ public class ListenerManager { * *After the data changes, it is distributed to the corresponding listener for processing */ - public static void dataChange(String key,String path, DataChangeEvent dataChangeEvent) { + public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) { SubscribeListener notifyListener = listeners.get(key); if (null == notifyListener) { return; } - notifyListener.notify(path,dataChangeEvent); + notifyListener.notify(path, data, dataChangeEvent); } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java index 6a2f3d1b6e..3db7f2e1a2 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java @@ -25,6 +25,6 @@ public interface SubscribeListener { /** * Processing logic when the subscription node changes */ - void notify(String path, DataChangeEvent dataChangeEvent); + void notify(String path, String data, DataChangeEvent dataChangeEvent); }