diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index d713c8366f..7e0268f1f9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -139,11 +139,11 @@ public class ServerNodeManager implements InitializingBean { /** * init MasterNodeListener listener */ - registryCenter.getRegisterOperator().addListener(new MasterNodeListener()); + registryCenter.getRegisterOperator().registerListener(new MasterNodeListener(Integer.MAX_VALUE)); /** * init WorkerNodeListener listener */ - registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener()); + registryCenter.getRegisterOperator().registerListener(new WorkerGroupNodeListener(Integer.MAX_VALUE)); } /** @@ -207,6 +207,10 @@ public class ServerNodeManager implements InitializingBean { */ class WorkerGroupNodeListener extends AbstractListener { + public WorkerGroupNodeListener(int order) { + super(order); + } + @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { if (registryCenter.isWorkerPath(path)) { @@ -246,6 +250,10 @@ public class ServerNodeManager implements InitializingBean { */ class MasterNodeListener extends AbstractListener { + public MasterNodeListener(int order) { + super(order); + } + @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { if (registryCenter.isMasterPath(path)) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java index 8daf8a153a..1f98ca0ac1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.apache.commons.lang.StringUtils; @@ -97,7 +98,7 @@ public class ZKMasterClient extends AbstractZKClient { removeZKNodePath(null, ZKNodeType.MASTER, true); removeZKNodePath(null, ZKNodeType.WORKER, true); } - registerListener(); + registerListener(new NodeChangeListener(Integer.MIN_VALUE)); } catch (Exception e) { logger.error("master start up exception", e); } finally { @@ -115,21 +116,28 @@ public class ZKMasterClient extends AbstractZKClient { super.close(); } - /** - * handle path events that this class cares about - * - * @param client zkClient - * @param event path event - * @param path zk path - */ - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - //monitor master - if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) { - handleMasterEvent(event, path); - } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) { - //monitor worker - handleWorkerEvent(event, path); + class NodeChangeListener extends AbstractListener { + + public NodeChangeListener(int order) { + super(order); + } + + /** + * handle path events that this class cares about + * + * @param client zkClient + * @param event path event + * @param path zk path + */ + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + //monitor master + if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) { + handleMasterEvent(event, path); + } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) { + //monitor worker + handleWorkerEvent(event, path); + } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java index 3e3e6c8c20..f27d32267a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java @@ -21,7 +21,16 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; -public abstract class AbstractListener implements TreeCacheListener { +public abstract class AbstractListener implements TreeCacheListener, Comparable { + + /** + * The order is represent as prioritization, the high order will be executed first + */ + private final int order; + + public AbstractListener(int order) { + this.order = order; + } @Override public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { @@ -33,4 +42,9 @@ public abstract class AbstractListener implements TreeCacheListener { } protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path); + + @Override + public int compareTo(AbstractListener o) { + return order - o.order; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index 647a3c9189..323dcb2715 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -16,37 +16,35 @@ */ package org.apache.dolphinscheduler.service.zk; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import java.nio.charset.StandardCharsets; - @Component public class ZookeeperCachedOperator extends ZookeeperOperator { private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); - private TreeCache treeCache; + + /** + * The main point to define a listener list here is to execute the listener at customize order. + */ + private List listenerList = new CopyOnWriteArrayList<>(); + /** * register a unified listener of /${dsRoot}, */ @Override - protected void registerListener() { - - treeCache.getListenable().addListener((client, event) -> { - String path = null == event.getData() ? "" : event.getData().getPath(); - if (path.isEmpty()) { - return; - } - dataChanged(client, event, path); - }); + public void registerListener(AbstractListener abstractListener) { + logger.info("register zookeeper listener: {}", abstractListener.getClass().getName()); + listenerList.add(abstractListener); + listenerList.sort(AbstractListener::compareTo); } @Override @@ -59,25 +57,12 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); throw new RuntimeException(e); } - } - - //for sub class - protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){} - - public String getFromCache(final String cachePath, final String key) { - ChildData resultInCache = treeCache.getCurrentData(key); - if (null != resultInCache) { - return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); - } - return null; - } - - public TreeCache getTreeCache(final String cachePath) { - return treeCache; - } - - public void addListener(TreeCacheListener listener){ - this.treeCache.getListenable().addListener(listener); + treeCache.getListenable().addListener(((client, event) -> { + for (AbstractListener abstractListener : listenerList) { + logger.debug("zookeeperListener:{} triggered", abstractListener.getClass().getName()); + abstractListener.childEvent(client, event); + } + })); } @Override diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index dbfb8e2c36..5591bb485e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -30,7 +30,6 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -65,7 +64,8 @@ public class ZookeeperOperator implements InitializingBean { /** * this method is for sub class, */ - protected void registerListener(){} + protected void registerListener(AbstractListener abstractListener) { + } protected void treeCacheStart(){} @@ -143,16 +143,6 @@ public class ZookeeperOperator implements InitializingBean { } } - public boolean hasChildren(final String key) { - Stat stat; - try { - stat = zkClient.checkExists().forPath(key); - return stat.getNumChildren() >= 1; - } catch (Exception ex) { - throw new IllegalStateException(ex); - } - } - public boolean isExisted(final String key) { try { return zkClient.checkExists().forPath(key) != null; @@ -194,28 +184,6 @@ public class ZookeeperOperator implements InitializingBean { } } - public void persistEphemeral(String key, String value, boolean overwrite) { - try { - if (overwrite) { - persistEphemeral(key, value); - } else { - if (!isExisted(key)) { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); - } - } - } catch (final Exception ex) { - logger.error("persistEphemeral key : {} , value : {}, overwrite : {}", key, value, overwrite, ex); - } - } - - public void persistEphemeralSequential(final String key, String value) { - try { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); - } catch (final Exception ex) { - logger.error("persistEphemeralSequential key : {}", key, ex); - } - } - public void remove(final String key) { try { if (isExisted(key)) { diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java new file mode 100644 index 0000000000..6eb9887b22 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; + +import org.junit.Assert; +import org.junit.Test; + +public class ZookeeperCachedOperatorTest { + + private ZookeeperCachedOperator zookeeperCachedOperator = new ZookeeperCachedOperator(); + + @Test + public void testRegisterListener() { + AbstractListener abstractListener1 = new AbstractListener(1) { + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + // ignore + } + }; + AbstractListener abstractListener2 = new AbstractListener(2) { + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + // ignore + } + }; + zookeeperCachedOperator.registerListener(abstractListener2); + zookeeperCachedOperator.registerListener(abstractListener1); + Assert.assertTrue(true); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 65830b69e7..6c82db9f5d 100644 --- a/pom.xml +++ b/pom.xml @@ -847,6 +847,7 @@ **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java + **/service/zk/ZookeeperCachedOperatorTest.java **/service/zk/CuratorZookeeperClientTest.java **/service/zk/RegisterOperatorTest.java **/service/queue/TaskUpdateQueueTest.java