|
|
|
@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.registry.api.Registry;
|
|
|
|
|
import org.apache.dolphinscheduler.registry.api.RegistryException; |
|
|
|
|
import org.apache.dolphinscheduler.registry.api.SubscribeListener; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.lang3.time.DurationUtils; |
|
|
|
|
import org.apache.curator.framework.CuratorFramework; |
|
|
|
|
import org.apache.curator.framework.CuratorFrameworkFactory; |
|
|
|
|
import org.apache.curator.framework.api.ACLProvider; |
|
|
|
@ -76,8 +77,8 @@ final class ZookeeperRegistry implements Registry {
|
|
|
|
|
.connectString(properties.getConnectString()) |
|
|
|
|
.retryPolicy(retryPolicy) |
|
|
|
|
.namespace(properties.getNamespace()) |
|
|
|
|
.sessionTimeoutMs((int) properties.getSessionTimeout().toMillis()) |
|
|
|
|
.connectionTimeoutMs((int) properties.getConnectionTimeout().toMillis()); |
|
|
|
|
.sessionTimeoutMs(DurationUtils.toMillisInt(properties.getSessionTimeout())) |
|
|
|
|
.connectionTimeoutMs(DurationUtils.toMillisInt(properties.getConnectionTimeout())); |
|
|
|
|
|
|
|
|
|
final String digest = properties.getDigest(); |
|
|
|
|
if (!Strings.isNullOrEmpty(digest)) { |
|
|
|
@ -102,9 +103,10 @@ final class ZookeeperRegistry implements Registry {
|
|
|
|
|
public void start() { |
|
|
|
|
client.start(); |
|
|
|
|
try { |
|
|
|
|
if (!client.blockUntilConnected((int) properties.getBlockUntilConnected().toMillis(), MILLISECONDS)) { |
|
|
|
|
if (!client.blockUntilConnected(DurationUtils.toMillisInt(properties.getBlockUntilConnected()), |
|
|
|
|
MILLISECONDS)) { |
|
|
|
|
client.close(); |
|
|
|
|
throw new RegistryException("zookeeper connect timeout: " + properties.getConnectString()); |
|
|
|
|
throw new RegistryException("zookeeper connect failed in : " + properties.getConnectString() + "ms"); |
|
|
|
|
} |
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
@ -120,7 +122,7 @@ final class ZookeeperRegistry implements Registry {
|
|
|
|
|
@Override |
|
|
|
|
public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException { |
|
|
|
|
try { |
|
|
|
|
if (!client.blockUntilConnected((int) timeout.toMillis(), MILLISECONDS)) { |
|
|
|
|
if (!client.blockUntilConnected(DurationUtils.toMillisInt(timeout), MILLISECONDS)) { |
|
|
|
|
throw new RegistryException( |
|
|
|
|
String.format("Cannot connect to registry in %s s", timeout.getSeconds())); |
|
|
|
|
} |
|
|
|
|