@ -16,6 +16,7 @@
* /
package org.apache.dolphinscheduler.api.utils ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
@ -43,6 +44,8 @@ public class ZooKeeperState {
private int watches = - 1 ;
private int connections = - 1 ;
private boolean healthFlag = false ;
public ZooKeeperState ( String connectionString ) {
String host = connectionString . substring ( 0 ,
connectionString . indexOf ( ':' ) ) ;
@ -55,55 +58,68 @@ public class ZooKeeperState {
public void getZookeeperInfo ( ) {
String content = cmd ( "srvr" ) ;
if ( StringUtils . isNotBlank ( content ) ) {
try ( Scanner scannerForStat = new Scanner ( content ) ) {
while ( scannerForStat . hasNext ( ) ) {
String line = scannerForStat . nextLine ( ) ;
if ( line . startsWith ( "Latency min/avg/max:" ) ) {
String [ ] latencys = getStringValueFromLine ( line ) . split ( "/" ) ;
minLatency = Float . parseFloat ( latencys [ 0 ] ) ;
avgLatency = Float . parseFloat ( latencys [ 1 ] ) ;
maxLatency = Float . parseFloat ( latencys [ 2 ] ) ;
} else if ( line . startsWith ( "Received:" ) ) {
received = Long . parseLong ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Sent:" ) ) {
sent = Long . parseLong ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Outstanding:" ) ) {
outStanding = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Zxid:" ) ) {
zxid = Long . parseLong ( getStringValueFromLine ( line ) . substring ( 2 ) , 16 ) ;
} else if ( line . startsWith ( "Mode:" ) ) {
mode = getStringValueFromLine ( line ) ;
} else if ( line . startsWith ( "Node count:" ) ) {
nodeCount = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
if ( Constants . STRING_FALSE . equals ( content ) ) {
healthFlag = true ;
} else {
try ( Scanner scannerForStat = new Scanner ( content ) ) {
while ( scannerForStat . hasNext ( ) ) {
String line = scannerForStat . nextLine ( ) ;
if ( line . startsWith ( "Latency min/avg/max:" ) ) {
String [ ] latencys = getStringValueFromLine ( line ) . split ( "/" ) ;
minLatency = Float . parseFloat ( latencys [ 0 ] ) ;
avgLatency = Float . parseFloat ( latencys [ 1 ] ) ;
maxLatency = Float . parseFloat ( latencys [ 2 ] ) ;
} else if ( line . startsWith ( "Received:" ) ) {
received = Long . parseLong ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Sent:" ) ) {
sent = Long . parseLong ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Outstanding:" ) ) {
outStanding = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
} else if ( line . startsWith ( "Zxid:" ) ) {
zxid = Long . parseLong ( getStringValueFromLine ( line ) . substring ( 2 ) , 16 ) ;
} else if ( line . startsWith ( "Mode:" ) ) {
mode = getStringValueFromLine ( line ) ;
} else if ( line . startsWith ( "Node count:" ) ) {
nodeCount = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
}
}
}
}
}
}
String wchsText = cmd ( "wchs" ) ;
if ( StringUtils . isNotBlank ( wchsText ) ) {
try ( Scanner scannerForWchs = new Scanner ( wchsText ) ) {
while ( scannerForWchs . hasNext ( ) ) {
String line = scannerForWchs . nextLine ( ) ;
if ( line . startsWith ( "Total watches:" ) ) {
watches = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
if ( Constants . STRING_FALSE . equals ( wchsText ) ) {
healthFlag = true ;
} else {
try ( Scanner scannerForWchs = new Scanner ( wchsText ) ) {
while ( scannerForWchs . hasNext ( ) ) {
String line = scannerForWchs . nextLine ( ) ;
if ( line . startsWith ( "Total watches:" ) ) {
watches = Integer . parseInt ( getStringValueFromLine ( line ) ) ;
}
}
}
}
}
}
String consText = cmd ( "cons" ) ;
if ( StringUtils . isNotBlank ( consText ) ) {
Scanner scannerForCons = new Scanner ( consText ) ;
if ( StringUtils . isNotBlank ( consText ) ) {
connections = 0 ;
}
while ( scannerForCons . hasNext ( ) ) {
@SuppressWarnings ( "unused" )
String line = scannerForCons . nextLine ( ) ;
+ + connections ;
if ( Constants . STRING_FALSE . equals ( consText ) ) {
healthFlag = true ;
} else {
Scanner scannerForCons = new Scanner ( consText ) ;
if ( StringUtils . isNotBlank ( consText ) ) {
connections = 0 ;
}
while ( scannerForCons . hasNext ( ) ) {
@SuppressWarnings ( "unused" )
String line = scannerForCons . nextLine ( ) ;
+ + connections ;
}
scannerForCons . close ( ) ;
}
scannerForCons . close ( ) ;
}
}
@ -121,7 +137,7 @@ public class ZooKeeperState {
private class SendThread extends Thread {
private String cmd ;
private String ret = "" ;
private String ret = Constants . STRING_FALSE ;
public SendThread ( String cmd ) {
this . cmd = cmd ;
@ -150,7 +166,7 @@ public class ZooKeeperState {
} catch ( InterruptedException e ) {
logger . error ( "send " + cmd + " to server " + host + ":" + port + " failed!" , e ) ;
}
return "" ;
return Constants . STRING_FALSE ;
}
public Logger getLogger ( ) {
@ -209,6 +225,10 @@ public class ZooKeeperState {
return connections ;
}
public boolean isHealthFlag ( ) {
return healthFlag ;
}
@Override
public String toString ( ) {
return "ZooKeeperState [host=" + host + ", port=" + port
@ -217,7 +237,7 @@ public class ZooKeeperState {
+ ", sent=" + sent + ", outStanding=" + outStanding + ", zxid="
+ zxid + ", mode=" + mode + ", nodeCount=" + nodeCount
+ ", watches=" + watches + ", connections="
+ connections + "]" ;
+ connections + ",healthFlag=" + healthFlag + " ]" ;
}