@ -24,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry ;
import org.apache.curator.retry.ExponentialBackoffRetry ;
import org.apache.zookeeper.ZooDefs ;
import org.apache.zookeeper.ZooDefs ;
import org.apache.zookeeper.data.ACL ;
import org.apache.zookeeper.data.ACL ;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.InitializingBean ;
import org.springframework.beans.factory.InitializingBean ;
@ -32,6 +33,7 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets ;
import java.nio.charset.StandardCharsets ;
import java.util.List ;
import java.util.List ;
import java.util.concurrent.TimeUnit ;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull ;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull ;
@ -55,9 +57,10 @@ public class CuratorZookeeperClient implements InitializingBean {
}
}
private CuratorFramework buildClient ( ) {
private CuratorFramework buildClient ( ) {
logger . info ( "zookeeper registry center init, server lists is: {}. " , zookeeperConfig . getServerList ( ) ) ;
logger . info ( "zookeeper registry center init, server lists is: [{}] " , zookeeperConfig . getServerList ( ) ) ;
CuratorFrameworkFactory . Builder builder = CuratorFrameworkFactory . builder ( ) . ensembleProvider ( new DefaultEnsembleProvider ( checkNotNull ( zookeeperConfig . getServerList ( ) , "zookeeper quorum can't be null" ) ) )
CuratorFrameworkFactory . Builder builder = CuratorFrameworkFactory . builder ( )
. ensembleProvider ( new DefaultEnsembleProvider ( checkNotNull ( zookeeperConfig . getServerList ( ) , "zookeeper quorum can't be null" ) ) )
. retryPolicy ( new ExponentialBackoffRetry ( zookeeperConfig . getBaseSleepTimeMs ( ) , zookeeperConfig . getMaxRetries ( ) , zookeeperConfig . getMaxSleepMs ( ) ) ) ;
. retryPolicy ( new ExponentialBackoffRetry ( zookeeperConfig . getBaseSleepTimeMs ( ) , zookeeperConfig . getMaxRetries ( ) , zookeeperConfig . getMaxSleepMs ( ) ) ) ;
//these has default value
//these has default value
@ -84,7 +87,9 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient = builder . build ( ) ;
zkClient = builder . build ( ) ;
zkClient . start ( ) ;
zkClient . start ( ) ;
try {
try {
zkClient . blockUntilConnected ( ) ;
logger . info ( "trying to connect zookeeper server list:{}" , zookeeperConfig . getServerList ( ) ) ;
zkClient . blockUntilConnected ( 30 , TimeUnit . SECONDS ) ;
} catch ( final Exception ex ) {
} catch ( final Exception ex ) {
throw new RuntimeException ( ex ) ;
throw new RuntimeException ( ex ) ;
}
}
@ -95,12 +100,14 @@ public class CuratorZookeeperClient implements InitializingBean {
checkNotNull ( zkClient ) ;
checkNotNull ( zkClient ) ;
zkClient . getConnectionStateListenable ( ) . addListener ( ( client , newState ) - > {
zkClient . getConnectionStateListenable ( ) . addListener ( ( client , newState ) - > {
if ( newState = = ConnectionState . LOST ) {
if ( newState = = ConnectionState . LOST ) {
logger . error ( "connection lost from zookeeper" ) ;
logger . error ( "connection lost from zookeeper" ) ;
} else if ( newState = = ConnectionState . RECONNECTED ) {
} else if ( newState = = ConnectionState . RECONNECTED ) {
logger . info ( "reconnected to zookeeper" ) ;
logger . info ( "reconnected to zookeeper" ) ;
} else if ( newState = = ConnectionState . SUSPENDED ) {
} else if ( newState = = ConnectionState . SUSPENDED ) {
logger . warn ( "connection SUSPENDED to zookeeper" ) ;
logger . warn ( "connection SUSPENDED to zookeeper" ) ;
} else if ( newState = = ConnectionState . CONNECTED ) {
logger . info ( "connected to zookeeper server list:[{}]" , zookeeperConfig . getServerList ( ) ) ;
}
}
} ) ;
} ) ;
}
}