From 273cbc53693c9a159e1849342ddefdccabf61c9b Mon Sep 17 00:00:00 2001 From: baoliang Date: Mon, 9 Nov 2020 17:38:45 +0800 Subject: [PATCH] merge from 1.3.3-release --- .../service/zk/ZookeeperCachedOperator.java | 20 ++-- .../service/zk/ZookeeperOperator.java | 93 +++++++++++++++---- 2 files changed, 84 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index 4fe941503e..647a3c9189 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -39,14 +39,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { */ @Override protected void registerListener() { - treeCache = new TreeCache(getZkClient(), getZookeeperConfig().getDsRoot() + "/nodes"); - logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); - try { - treeCache.start(); - } catch (Exception e) { - logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); - throw new RuntimeException(e); - } treeCache.getListenable().addListener((client, event) -> { String path = null == event.getData() ? "" : event.getData().getPath(); @@ -55,7 +47,18 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { } dataChanged(client, event, path); }); + } + @Override + protected void treeCacheStart() { + treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes"); + logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); + try { + treeCache.start(); + } catch (Exception e) { + logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); + throw new RuntimeException(e); + } } //for sub class @@ -83,7 +86,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { try { Thread.sleep(500); } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); } super.close(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index 7aeb7289c4..e7b049f8bf 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -50,11 +50,15 @@ public class ZookeeperOperator implements InitializingBean { private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class); @Autowired - private CuratorZookeeperClient zookeeperClient; + private ZookeeperConfig zookeeperConfig; + + protected CuratorFramework zkClient; @Override public void afterPropertiesSet() throws Exception { - registerListener(); + this.zkClient = buildClient(); + initStateLister(); + treeCacheStart(); } /** @@ -62,9 +66,62 @@ public class ZookeeperOperator implements InitializingBean { */ protected void registerListener(){} + protected void treeCacheStart(){} + + public void initStateLister() { + checkNotNull(zkClient); + + zkClient.getConnectionStateListenable().addListener((client, newState) -> { + if(newState == ConnectionState.LOST){ + logger.error("connection lost from zookeeper"); + } else if(newState == ConnectionState.RECONNECTED){ + logger.info("reconnected to zookeeper"); + } else if(newState == ConnectionState.SUSPENDED){ + logger.warn("connection SUSPENDED to zookeeper"); + } + }); + } + + private CuratorFramework buildClient() { + logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) + .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); + + //these has default value + if (0 != zookeeperConfig.getSessionTimeoutMs()) { + builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()); + } + if (0 != zookeeperConfig.getConnectionTimeoutMs()) { + builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()); + } + if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) { + builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + zkClient = builder.build(); + zkClient.start(); + try { + zkClient.blockUntilConnected(); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + return zkClient; + } + public String get(final String key) { try { - return new String(zookeeperClient.getZkClient().getData().forPath(key), StandardCharsets.UTF_8); + return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8); } catch (Exception ex) { logger.error("get key : {}", key, ex); } @@ -74,7 +131,7 @@ public class ZookeeperOperator implements InitializingBean { public List getChildrenKeys(final String key) { List values; try { - values = zookeeperClient.getZkClient().getChildren().forPath(key); + values = zkClient.getChildren().forPath(key); return values; } catch (InterruptedException ex) { logger.error("getChildrenKeys key : {} InterruptedException", key); @@ -88,7 +145,7 @@ public class ZookeeperOperator implements InitializingBean { public boolean hasChildren(final String key){ Stat stat ; try { - stat = zookeeperClient.getZkClient().checkExists().forPath(key); + stat = zkClient.checkExists().forPath(key); return stat.getNumChildren() >= 1; } catch (Exception ex) { throw new IllegalStateException(ex); @@ -97,7 +154,7 @@ public class ZookeeperOperator implements InitializingBean { public boolean isExisted(final String key) { try { - return zookeeperClient.getZkClient().checkExists().forPath(key) != null; + return zkClient.checkExists().forPath(key) != null; } catch (Exception ex) { logger.error("isExisted key : {}", key, ex); } @@ -107,7 +164,7 @@ public class ZookeeperOperator implements InitializingBean { public void persist(final String key, final String value) { try { if (!isExisted(key)) { - zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } else { update(key, value); } @@ -118,11 +175,7 @@ public class ZookeeperOperator implements InitializingBean { public void update(final String key, final String value) { try { - - CuratorOp check = zookeeperClient.getZkClient().transactionOp().check().forPath(key); - CuratorOp setData = zookeeperClient.getZkClient().transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)); - zookeeperClient.getZkClient().transaction().forOperations(check, setData); - + zkClient.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)).and().commit(); } catch (Exception ex) { logger.error("update key : {} , value : {}", key, value, ex); } @@ -132,12 +185,12 @@ public class ZookeeperOperator implements InitializingBean { try { if (isExisted(key)) { try { - zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key); + zkClient.delete().deletingChildrenIfNeeded().forPath(key); } catch (KeeperException.NoNodeException ignore) { //NOP } } - zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } catch (final Exception ex) { logger.error("persistEphemeral key : {} , value : {}", key, value, ex); } @@ -149,7 +202,7 @@ public class ZookeeperOperator implements InitializingBean { persistEphemeral(key, value); } else { if (!isExisted(key)) { - zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } } } catch (final Exception ex) { @@ -159,7 +212,7 @@ public class ZookeeperOperator implements InitializingBean { public void persistEphemeralSequential(final String key, String value) { try { - zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); } catch (final Exception ex) { logger.error("persistEphemeralSequential key : {}", key, ex); } @@ -168,7 +221,7 @@ public class ZookeeperOperator implements InitializingBean { public void remove(final String key) { try { if (isExisted(key)) { - zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key); + zkClient.delete().deletingChildrenIfNeeded().forPath(key); } } catch (KeeperException.NoNodeException ignore) { //NOP @@ -178,14 +231,14 @@ public class ZookeeperOperator implements InitializingBean { } public CuratorFramework getZkClient() { - return zookeeperClient.getZkClient(); + return zkClient; } public ZookeeperConfig getZookeeperConfig() { - return zookeeperClient.getZookeeperConfig(); + return zookeeperConfig; } public void close() { - CloseableUtils.closeQuietly(zookeeperClient.getZkClient()); + CloseableUtils.closeQuietly(zkClient); } } \ No newline at end of file