Browse Source

merge from 1.3.3-release

pull/3/MERGE
baoliang 4 years ago
parent
commit
273cbc5369
  1. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  2. 93
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -39,14 +39,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
*/
@Override
protected void registerListener() {
treeCache = new TreeCache(getZkClient(), getZookeeperConfig().getDsRoot() + "/nodes");
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
treeCache.getListenable().addListener((client, event) -> {
String path = null == event.getData() ? "" : event.getData().getPath();
@ -55,7 +47,18 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
}
dataChanged(client, event, path);
});
}
@Override
protected void treeCacheStart() {
treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes");
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
}
//for sub class
@ -83,7 +86,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
try {
Thread.sleep(500);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
super.close();
}

93
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -50,11 +50,15 @@ public class ZookeeperOperator implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class);
@Autowired
private CuratorZookeeperClient zookeeperClient;
private ZookeeperConfig zookeeperConfig;
protected CuratorFramework zkClient;
@Override
public void afterPropertiesSet() throws Exception {
registerListener();
this.zkClient = buildClient();
initStateLister();
treeCacheStart();
}
/**
@ -62,9 +66,62 @@ public class ZookeeperOperator implements InitializingBean {
*/
protected void registerListener(){}
protected void treeCacheStart(){}
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
if(newState == ConnectionState.LOST){
logger.error("connection lost from zookeeper");
} else if(newState == ConnectionState.RECONNECTED){
logger.info("reconnected to zookeeper");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("connection SUSPENDED to zookeeper");
}
});
}
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
if (0 != zookeeperConfig.getSessionTimeoutMs()) {
builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs());
}
if (0 != zookeeperConfig.getConnectionTimeoutMs()) {
builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs());
}
if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) {
builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
zkClient = builder.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
return zkClient;
}
public String get(final String key) {
try {
return new String(zookeeperClient.getZkClient().getData().forPath(key), StandardCharsets.UTF_8);
return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8);
} catch (Exception ex) {
logger.error("get key : {}", key, ex);
}
@ -74,7 +131,7 @@ public class ZookeeperOperator implements InitializingBean {
public List<String> getChildrenKeys(final String key) {
List<String> values;
try {
values = zookeeperClient.getZkClient().getChildren().forPath(key);
values = zkClient.getChildren().forPath(key);
return values;
} catch (InterruptedException ex) {
logger.error("getChildrenKeys key : {} InterruptedException", key);
@ -88,7 +145,7 @@ public class ZookeeperOperator implements InitializingBean {
public boolean hasChildren(final String key){
Stat stat ;
try {
stat = zookeeperClient.getZkClient().checkExists().forPath(key);
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;
} catch (Exception ex) {
throw new IllegalStateException(ex);
@ -97,7 +154,7 @@ public class ZookeeperOperator implements InitializingBean {
public boolean isExisted(final String key) {
try {
return zookeeperClient.getZkClient().checkExists().forPath(key) != null;
return zkClient.checkExists().forPath(key) != null;
} catch (Exception ex) {
logger.error("isExisted key : {}", key, ex);
}
@ -107,7 +164,7 @@ public class ZookeeperOperator implements InitializingBean {
public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} else {
update(key, value);
}
@ -118,11 +175,7 @@ public class ZookeeperOperator implements InitializingBean {
public void update(final String key, final String value) {
try {
CuratorOp check = zookeeperClient.getZkClient().transactionOp().check().forPath(key);
CuratorOp setData = zookeeperClient.getZkClient().transactionOp().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
zookeeperClient.getZkClient().transaction().forOperations(check, setData);
zkClient.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)).and().commit();
} catch (Exception ex) {
logger.error("update key : {} , value : {}", key, value, ex);
}
@ -132,12 +185,12 @@ public class ZookeeperOperator implements InitializingBean {
try {
if (isExisted(key)) {
try {
zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key);
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
} catch (KeeperException.NoNodeException ignore) {
//NOP
}
}
zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
logger.error("persistEphemeral key : {} , value : {}", key, value, ex);
}
@ -149,7 +202,7 @@ public class ZookeeperOperator implements InitializingBean {
persistEphemeral(key, value);
} else {
if (!isExisted(key)) {
zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
}
}
} catch (final Exception ex) {
@ -159,7 +212,7 @@ public class ZookeeperOperator implements InitializingBean {
public void persistEphemeralSequential(final String key, String value) {
try {
zookeeperClient.getZkClient().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
logger.error("persistEphemeralSequential key : {}", key, ex);
}
@ -168,7 +221,7 @@ public class ZookeeperOperator implements InitializingBean {
public void remove(final String key) {
try {
if (isExisted(key)) {
zookeeperClient.getZkClient().delete().deletingChildrenIfNeeded().forPath(key);
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
}
} catch (KeeperException.NoNodeException ignore) {
//NOP
@ -178,14 +231,14 @@ public class ZookeeperOperator implements InitializingBean {
}
public CuratorFramework getZkClient() {
return zookeeperClient.getZkClient();
return zkClient;
}
public ZookeeperConfig getZookeeperConfig() {
return zookeeperClient.getZookeeperConfig();
return zookeeperConfig;
}
public void close() {
CloseableUtils.closeQuietly(zookeeperClient.getZkClient());
CloseableUtils.closeQuietly(zkClient);
}
}
Loading…
Cancel
Save