diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java index 7ba704916c..be4754544f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java @@ -81,8 +81,7 @@ public class ZookeeperMonitor extends AbstractZKClient { if(ok){ state.getZookeeperInfo(); } - - String hostName = zookeeperServer; + int connections = state.getConnections(); int watches = state.getWatches(); long sent = state.getSent(); @@ -95,7 +94,7 @@ public class ZookeeperMonitor extends AbstractZKClient { int status = ok ? 1 : 0; Date date = new Date(); - ZookeeperRecord zookeeperRecord = new ZookeeperRecord(hostName,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); + ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); list.add(zookeeperRecord); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 83095a371a..9a3f2a6790 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; @@ -43,7 +44,7 @@ import java.util.Set; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) public class MasterTaskExecThreadTest { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 8b6392675a..8a7d891c2e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -326,7 +326,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { @Override public String toString() { return "AbstractZKClient{" + - "zkClient=" + zkClient + + "zkClient=" + getZkClient() + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java new file mode 100644 index 0000000000..c08da0ef72 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.service.zk; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +/** + * Shared Curator zookeeper client + */ +@Component +public class CuratorZookeeperClient implements InitializingBean { + private final Logger logger = LoggerFactory.getLogger(CuratorZookeeperClient.class); + + @Autowired + private ZookeeperConfig zookeeperConfig; + + private CuratorFramework zkClient; + + + @Override + public void afterPropertiesSet() throws Exception { + this.zkClient = buildClient(); + initStateLister(); + } + + private CuratorFramework buildClient() { + logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) + .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); + + //these has default value + if (0 != zookeeperConfig.getSessionTimeoutMs()) { + builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()); + } + if (0 != zookeeperConfig.getConnectionTimeoutMs()) { + builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()); + } + if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) { + builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + zkClient = builder.build(); + zkClient.start(); + try { + zkClient.blockUntilConnected(); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + return zkClient; + } + + public void initStateLister() { + checkNotNull(zkClient); + + zkClient.getConnectionStateListenable().addListener((client, newState) -> { + if(newState == ConnectionState.LOST){ + logger.error("connection lost from zookeeper"); + } else if(newState == ConnectionState.RECONNECTED){ + logger.info("reconnected to zookeeper"); + } else if(newState == ConnectionState.SUSPENDED){ + logger.warn("connection SUSPENDED to zookeeper"); + } + }); + } + + public ZookeeperConfig getZookeeperConfig() { + return zookeeperConfig; + } + + public void setZookeeperConfig(ZookeeperConfig zookeeperConfig) { + this.zookeeperConfig = zookeeperConfig; + } + + public CuratorFramework getZkClient() { + return zkClient; + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index e71cb74e15..3fa47f848c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -39,7 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { */ @Override protected void registerListener() { - treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes"); + treeCache = new TreeCache(getZkClient(), getZookeeperConfig().getDsRoot() + "/nodes"); logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); try { treeCache.start(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index ba3a3bfecb..7aeb7289c4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -50,14 +50,10 @@ public class ZookeeperOperator implements InitializingBean { private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class); @Autowired - private ZookeeperConfig zookeeperConfig; - - protected CuratorFramework zkClient; + private CuratorZookeeperClient zookeeperClient; @Override public void afterPropertiesSet() throws Exception { - this.zkClient = buildClient(); - initStateLister(); registerListener(); } @@ -66,62 +62,9 @@ public class ZookeeperOperator implements InitializingBean { */ protected void registerListener(){} - public void initStateLister() { - checkNotNull(zkClient); - - zkClient.getConnectionStateListenable().addListener((client, newState) -> { - if(newState == ConnectionState.LOST){ - logger.error("connection lost from zookeeper"); - } else if(newState == ConnectionState.RECONNECTED){ - logger.info("reconnected to zookeeper"); - } else if(newState == ConnectionState.SUSPENDED){ - logger.warn("connection SUSPENDED to zookeeper"); - } - }); - } - - private CuratorFramework buildClient() { - logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); - - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) - .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); - - //these has default value - if (0 != zookeeperConfig.getSessionTimeoutMs()) { - builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()); - } - if (0 != zookeeperConfig.getConnectionTimeoutMs()) { - builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()); - } - if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) { - builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { - - @Override - public List getDefaultAcl() { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } - - @Override - public List getAclForPath(final String path) { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } - }); - } - zkClient = builder.build(); - zkClient.start(); - try { - if (!zkClient.blockUntilConnected(zookeeperConfig.getMaxWaitTime(), TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Connect zookeeper expire max wait time"); - } - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - return zkClient; - } - public String get(final String key) { try { - return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8); + return new String(zookeeperClient.getZkClient().getData().forPath(key), StandardCharsets.UTF_8); } catch (Exception ex) { logger.error("get key : {}", key, ex); } @@ -131,7 +74,7 @@ public class ZookeeperOperator implements InitializingBean { public List getChildrenKeys(final String key) { List values; try { - values = zkClient.getChildren().forPath(key); + values = zookeeperClient.getZkClient().getChildren().forPath(key); return values; } catch (InterruptedException ex) { logger.error("getChildrenKeys key : {} InterruptedException", key); @@ -145,7 +88,7 @@ public class ZookeeperOperator implements InitializingBean { public boolean hasChildren(final String key){ Stat stat ; try { - stat = zkClient.checkExists().forPath(key); + stat = zookeeperClient.getZkClient().checkExists().forPath(key); return stat.getNumChildren() >= 1; } catch (Exception ex) { throw new IllegalStateException(ex); @@ -154,7 +97,7 @@ public class ZookeeperOperator implements InitializingBean { public boolean isExisted(final String key) { try { - return zkClient.checkExists().forPath(key) != null; + return zookeeperClient.getZkClient().checkExists().forPath(key) != null; } catch (Exception ex) { logger.error("isExisted key : {}", key, ex); } @@ -164,7 +107,7 @@ public class ZookeeperOperator implements InitializingBean { public void persist(final String key, final String value) { try { if (!isExisted(key)) { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } else { update(key, value); } @@ -176,9 +119,9 @@ public class ZookeeperOperator implements InitializingBean { public void update(final String key, final String value) { try { - CuratorOp check = zkClient.transactionOp().check().forPath(key); - CuratorOp setData = zkClient.transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)); - zkClient.transaction().forOperations(check, setData); + CuratorOp check = zookeeperClient.getZkClient().transactionOp().check().forPath(key); + CuratorOp setData = zookeeperClient.getZkClient().transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zookeeperClient.getZkClient().transaction().forOperations(check, setData); } catch (Exception ex) { logger.error("update key : {} , value : {}", key, value, ex); @@ -189,12 +132,12 @@ public class ZookeeperOperator implements InitializingBean { try { if (isExisted(key)) { try { - zkClient.delete().deletingChildrenIfNeeded().forPath(key); + zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key); } catch (KeeperException.NoNodeException ignore) { //NOP } } - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } catch (final Exception ex) { logger.error("persistEphemeral key : {} , value : {}", key, value, ex); } @@ -206,7 +149,7 @@ public class ZookeeperOperator implements InitializingBean { persistEphemeral(key, value); } else { if (!isExisted(key)) { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } } } catch (final Exception ex) { @@ -216,7 +159,7 @@ public class ZookeeperOperator implements InitializingBean { public void persistEphemeralSequential(final String key, String value) { try { - zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } catch (final Exception ex) { logger.error("persistEphemeralSequential key : {}", key, ex); } @@ -225,7 +168,7 @@ public class ZookeeperOperator implements InitializingBean { public void remove(final String key) { try { if (isExisted(key)) { - zkClient.delete().deletingChildrenIfNeeded().forPath(key); + zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key); } } catch (KeeperException.NoNodeException ignore) { //NOP @@ -235,14 +178,14 @@ public class ZookeeperOperator implements InitializingBean { } public CuratorFramework getZkClient() { - return zkClient; + return zookeeperClient.getZkClient(); } public ZookeeperConfig getZookeeperConfig() { - return zookeeperConfig; + return zookeeperClient.getZookeeperConfig(); } public void close() { - CloseableUtils.closeQuietly(zkClient); + CloseableUtils.closeQuietly(zookeeperClient.getZkClient()); } } \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java new file mode 100644 index 0000000000..c0297799ea --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.service.zk; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class CuratorZookeeperClientTest { + private static ZKServer zkServer; + + @Before + public void before() throws IOException { + new Thread(() -> { + if (zkServer == null) { + zkServer = new ZKServer(); + } + zkServer.startLocalZkServer(2185); + }).start(); + } + + @After + public void after() { + if (zkServer != null) { + zkServer.stop(); + } + } + + @Test + public void testAfterPropertiesSet() throws Exception { + TimeUnit.SECONDS.sleep(10); + CuratorZookeeperClient zookeeperClient = new CuratorZookeeperClient(); + ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); + zookeeperConfig.setServerList("127.0.0.1:2185"); + zookeeperConfig.setBaseSleepTimeMs(100); + zookeeperConfig.setMaxSleepMs(30000); + zookeeperConfig.setMaxRetries(10); + zookeeperConfig.setSessionTimeoutMs(60000); + zookeeperConfig.setConnectionTimeoutMs(30000); + zookeeperConfig.setDigest(" "); + zookeeperConfig.setDsRoot("/dolphinscheduler"); + zookeeperConfig.setMaxWaitTime(30000); + zookeeperClient.setZookeeperConfig(zookeeperConfig); + System.out.println("start"); + zookeeperClient.afterPropertiesSet(); + System.out.println("end"); + Assert.assertNotNull(zookeeperClient.getZkClient()); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4103c75ddb..71f8de9e1b 100644 --- a/pom.xml +++ b/pom.xml @@ -832,6 +832,7 @@ **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java + **/service/zk/CuratorZookeeperClientTest.java **/service/queue/TaskUpdateQueueTest.java **/dao/mapper/DataSourceUserMapperTest.java