@ -48,6 +48,7 @@ import java.util.HashMap;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Executors ;
@ -66,9 +67,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
/ * *
* server node manager
* /
@Service
public class ServerNodeManager implements InitializingBean {
@ -89,9 +87,6 @@ public class ServerNodeManager implements InitializingBean {
* /
private final ConcurrentHashMap < String , Set < String > > workerGroupNodes = new ConcurrentHashMap < > ( ) ;
/ * *
* master nodes
* /
private final Set < String > masterNodes = new HashSet < > ( ) ;
private final Map < String , WorkerHeartBeat > workerNodeInfo = new HashMap < > ( ) ;
@ -115,35 +110,36 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private MasterConfig masterConfig ;
private List < WorkerInfoChangeListener > workerInfoChangeListeners = new ArrayList < > ( ) ;
private final List < WorkerInfoChangeListener > workerInfoChangeListeners = new ArrayList < > ( ) ;
private static volatile int MASTER_SLOT = 0 ;
private volatile int currentSlot = 0 ;
private static volatile int MASTER_SIZE = 0 ;
private volatile int totalSlot = 0 ;
public static int getSlot ( ) {
return MASTER_SLOT ;
public int getSlot ( ) {
return currentSlot ;
}
public static int getMasterSize ( ) {
return MASTER_SIZE ;
public int getMasterSize ( ) {
return totalSlot ;
}
/ * *
* init listener
*
* @throws Exception if error throws Exception
* /
@Override
public void afterPropertiesSet ( ) throws Exception {
public void afterPropertiesSet ( ) {
// load nodes from zookeeper
load ( ) ;
updateMasterNodes ( ) ;
updateWorkerNodes ( ) ;
updateWorkerGroupMappings ( ) ;
// init executor service
executorService =
Executors . newSingleThreadScheduledExecutor ( new NamedThreadFactory ( "ServerNodeManagerExecutor" ) ) ;
executorService . scheduleWithFixedDelay ( new WorkerNodeInfoAndGroupDbSyncTask ( ) , 0 , 10 , TimeUnit . SECONDS ) ;
executorService . scheduleWithFixedDelay (
new WorkerNodeInfoAndGroupDbSyncTask ( ) ,
0 ,
masterConfig . getWorkerGroupRefreshInterval ( ) . getSeconds ( ) ,
TimeUnit . SECONDS ) ;
// init MasterNodeListener listener
registryClient . subscribe ( REGISTRY_DOLPHINSCHEDULER_MASTERS , new MasterDataListener ( ) ) ;
@ -152,19 +148,6 @@ public class ServerNodeManager implements InitializingBean {
registryClient . subscribe ( REGISTRY_DOLPHINSCHEDULER_WORKERS , new WorkerDataListener ( ) ) ;
}
/ * *
* load nodes from zookeeper
* /
public void load ( ) {
// master nodes from zookeeper
updateMasterNodes ( ) ;
updateWorkerNodes ( ) ;
updateWorkerGroupMappings ( ) ;
}
/ * *
* worker node info and worker group db sync task
* /
class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
@Override
@ -251,8 +234,8 @@ public class ServerNodeManager implements InitializingBean {
}
private void updateMasterNodes ( ) {
MASTER_SLOT = 0 ;
MASTER_SIZE = 0 ;
currentSlot = 0 ;
totalSlot = 0 ;
this . masterNodes . clear ( ) ;
String nodeLock = Constants . REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS ;
try {
@ -325,14 +308,12 @@ public class ServerNodeManager implements InitializingBean {
this . masterPriorityQueue . putList ( masterNodes ) ;
int index = masterPriorityQueue . getIndex ( masterConfig . getMasterAddress ( ) ) ;
if ( index > = 0 ) {
MASTER_SIZE = nodes . size ( ) ;
MASTER_SLOT = index ;
totalSlot = nodes . size ( ) ;
currentSlot = index ;
} else {
logger . warn ( "current addr:{} is not in active master list" ,
masterConfig . getMasterAddress ( ) ) ;
logger . warn ( "Current master is not in active master list" ) ;
}
logger . info ( "update master nodes, master size: {}, slot: {}, addr: {}" , MASTER_SIZE ,
MASTER_SLOT , masterConfig . getMasterAddress ( ) ) ;
logger . info ( "Update master nodes, total master size: {}, current slot: {}" , totalSlot , currentSlot ) ;
} finally {
masterLock . unlock ( ) ;
}
@ -360,10 +341,10 @@ public class ServerNodeManager implements InitializingBean {
workerGroup = Constants . DEFAULT_WORKER_GROUP ;
}
Set < String > nodes = workerGroupNodes . get ( workerGroup ) ;
if ( CollectionUtils . isNot Empty ( nodes ) ) {
return Collections . unmodifiableSet ( nodes ) ;
if ( CollectionUtils . isEmpty ( nodes ) ) {
return Collections . emptySet ( ) ;
}
return nodes ;
return Collections . unmodifiableSet ( nodes ) ;
} finally {
workerGroupReadLock . unlock ( ) ;
}
@ -373,45 +354,19 @@ public class ServerNodeManager implements InitializingBean {
return Collections . unmodifiableMap ( workerNodeInfo ) ;
}
/ * *
* get worker node info
*
* @param workerNode worker node
* @return worker node info
* /
public WorkerHeartBeat getWorkerNodeInfo ( String workerNode ) {
public Optional < WorkerHeartBeat > getWorkerNodeInfo ( String workerServerAddress ) {
workerNodeInfoReadLock . lock ( ) ;
try {
return workerNodeInfo . getOrDefault ( workerNode , null ) ;
return Optional . ofNullable ( workerNodeInfo . getOrDefault ( workerServerAddress , null ) ) ;
} finally {
workerNodeInfoReadLock . unlock ( ) ;
}
}
/ * *
* sync worker node info
*
* @param newWorkerNodeInfo new worker node info
* /
private void syncAllWorkerNodeInfo ( Map < String , String > newWorkerNodeInfo ) {
workerNodeInfoWriteLock . lock ( ) ;
try {
workerNodeInfo . clear ( ) ;
for ( Map . Entry < String , String > entry : newWorkerNodeInfo . entrySet ( ) ) {
workerNodeInfo . put ( entry . getKey ( ) , JSONUtils . parseObject ( entry . getValue ( ) , WorkerHeartBeat . class ) ) ;
}
} finally {
workerNodeInfoWriteLock . unlock ( ) ;
}
}
/ * *
* sync single worker node info
* /
private void syncSingleWorkerNodeInfo ( String node , WorkerHeartBeat info ) {
private void syncSingleWorkerNodeInfo ( String workerAddress , WorkerHeartBeat info ) {
workerNodeInfoWriteLock . lock ( ) ;
try {
workerNodeInfo . put ( node , info ) ;
workerNodeInfo . put ( workerAddress , info ) ;
} finally {
workerNodeInfoWriteLock . unlock ( ) ;
}
@ -434,9 +389,6 @@ public class ServerNodeManager implements InitializingBean {
}
}
/ * *
* destroy
* /
@PreDestroy
public void destroy ( ) {
executorService . shutdownNow ( ) ;