@ -17,63 +17,103 @@
package org.apache.dolphinscheduler.server.registry ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.curator.framework.CuratorFramework ;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ZKNodeType ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.AlertDao ;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup ;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper ;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory ;
import org.apache.dolphinscheduler.service.zk.AbstractListener ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.InitializingBean ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient ;
import org.apache.commons.collections.CollectionUtils ;
import org.apache.curator.framework.CuratorFramework ;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP ;
import javax.annotation.PreDestroy ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.InitializingBean ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Component ;
import org.springframework.stereotype.Service ;
/ * *
* zookeeper node manager
* serv er node manager
* /
@Service
public class ZookeeperNodeManager implements InitializingBean {
public class Serv erNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory . getLogger ( Zookeep erNodeManager. class ) ;
private final Logger logger = LoggerFactory . getLogger ( Serv erNodeManager. class ) ;
/ * *
* master lock
* master lock
* /
private final Lock masterLock = new ReentrantLock ( ) ;
/ * *
* worker group lock
* worker group lock
* /
private final Lock workerGroupLock = new ReentrantLock ( ) ;
/ * *
* worker group nodes
* worker node info lock
* /
private final Lock workerNodeInfoLock = new ReentrantLock ( ) ;
/ * *
* worker group nodes
* /
private final ConcurrentHashMap < String , Set < String > > workerGroupNodes = new ConcurrentHashMap < > ( ) ;
/ * *
* master nodes
* master nodes
* /
private final Set < String > masterNodes = new HashSet < > ( ) ;
/ * *
* worker node info
* /
private final Map < String , String > workerNodeInfo = new HashMap < > ( ) ;
/ * *
* executor service
* /
private ScheduledExecutorService executorService ;
/ * *
* zk client
* /
@Autowired
private ZKClient zkClient ;
/ * *
* zookeeper registry center
* /
@Autowired
private ZookeeperRegistryCenter registryCenter ;
/ * *
* worker group mapper
* /
@Autowired
private WorkerGroupMapper workerGroupMapper ;
/ * *
* alert dao
* /
@ -87,9 +127,14 @@ public class ZookeeperNodeManager implements InitializingBean {
@Override
public void afterPropertiesSet ( ) throws Exception {
/ * *
* load nodes from zookeeper
* load nodes from zookeeper
* /
load ( ) ;
/ * *
* init executor service
* /
executorService = Executors . newSingleThreadScheduledExecutor ( new NamedThreadFactory ( "ServerNodeManagerExecutor" ) ) ;
executorService . scheduleWithFixedDelay ( new WorkerNodeInfoAndGroupDbSyncTask ( ) , 0 , 10 , TimeUnit . SECONDS ) ;
/ * *
* init MasterNodeListener listener
* /
@ -103,22 +148,59 @@ public class ZookeeperNodeManager implements InitializingBean {
/ * *
* load nodes from zookeeper
* /
private void load ( ) {
private void load ( ) {
/ * *
* master nodes from zookeeper
* /
Set < String > m asterNodes = registryCenter . getMasterNodesDirectly ( ) ;
syncMasterNodes ( m asterNodes) ;
Set < String > initM asterNodes = registryCenter . getMasterNodesDirectly ( ) ;
syncMasterNodes ( initM asterNodes) ;
/ * *
* worker group nodes from zookeeper
* /
Set < String > workerGroups = registryCenter . getWorkerGroupDirectly ( ) ;
for ( String workerGroup : workerGroups ) {
for ( String workerGroup : workerGroups ) {
syncWorkerGroupNodes ( workerGroup , registryCenter . getWorkerGroupNodesDirectly ( workerGroup ) ) ;
}
}
/ * *
* zookeeper client
* /
@Component
static class ZKClient extends AbstractZKClient { }
/ * *
* worker node info and worker group db sync task
* /
class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
@Override
public void run ( ) {
// sync worker node info
Map < String , String > newWorkerNodeInfo = zkClient . getServerMaps ( ZKNodeType . WORKER , true ) ;
syncWorkerNodeInfo ( newWorkerNodeInfo ) ;
// sync worker group nodes from database
List < WorkerGroup > workerGroupList = workerGroupMapper . queryAllWorkerGroup ( ) ;
if ( CollectionUtils . isNotEmpty ( workerGroupList ) ) {
for ( WorkerGroup wg : workerGroupList ) {
String workerGroup = wg . getName ( ) ;
Set < String > nodes = new HashSet < > ( ) ;
String [ ] addrs = wg . getAddrList ( ) . split ( Constants . COMMA ) ;
for ( String addr : addrs ) {
if ( newWorkerNodeInfo . containsKey ( addr ) ) {
nodes . add ( addr ) ;
}
}
if ( ! nodes . isEmpty ( ) ) {
syncWorkerGroupNodes ( workerGroup , nodes ) ;
}
}
}
}
}
/ * *
* worker group node listener
* /
@ -126,7 +208,7 @@ public class ZookeeperNodeManager implements InitializingBean {
@Override
protected void dataChanged ( CuratorFramework client , TreeCacheEvent event , String path ) {
if ( registryCenter . isWorkerPath ( path ) ) {
if ( registryCenter . isWorkerPath ( path ) ) {
try {
if ( event . getType ( ) = = TreeCacheEvent . Type . NODE_ADDED ) {
logger . info ( "worker group node : {} added." , path ) ;
@ -141,25 +223,23 @@ public class ZookeeperNodeManager implements InitializingBean {
syncWorkerGroupNodes ( group , currentNodes ) ;
alertDao . sendServerStopedAlert ( 1 , path , "WORKER" ) ;
}
} catch ( IllegalArgumentException ignor e) {
logger . warn ( ignor e. getMessage ( ) ) ;
} catch ( IllegalArgumentException ex ) {
logger . warn ( ex . getMessage ( ) ) ;
} catch ( Exception ex ) {
logger . error ( "WorkerGroupListener capture data change and get data failed" , ex ) ;
}
}
}
private String parseGroup ( String path ) {
String [ ] parts = path . split ( "\\ /" ) ;
private String parseGroup ( String path ) {
String [ ] parts = path . split ( "/" ) ;
if ( parts . length < 6 ) {
throw new IllegalArgumentException ( String . format ( "worker group path : %s is not valid, ignore" , path ) ) ;
}
String group = parts [ parts . length - 2 ] ;
return group ;
return parts [ parts . length - 2 ] ;
}
}
/ * *
* master node listener
* /
@ -203,7 +283,7 @@ public class ZookeeperNodeManager implements InitializingBean {
* sync master nodes
* @param nodes master nodes
* /
private void syncMasterNodes ( Set < String > nodes ) {
private void syncMasterNodes ( Set < String > nodes ) {
masterLock . lock ( ) ;
try {
masterNodes . clear ( ) ;
@ -218,7 +298,7 @@ public class ZookeeperNodeManager implements InitializingBean {
* @param workerGroup worker group
* @param nodes worker nodes
* /
private void syncWorkerGroupNodes ( String workerGroup , Set < String > nodes ) {
private void syncWorkerGroupNodes ( String workerGroup , Set < String > nodes ) {
workerGroupLock . lock ( ) ;
try {
workerGroup = workerGroup . toLowerCase ( ) ;
@ -231,7 +311,7 @@ public class ZookeeperNodeManager implements InitializingBean {
}
}
public Map < String , Set < String > > getWorkerGroupNodes ( ) {
public Map < String , Set < String > > getWorkerGroupNodes ( ) {
return Collections . unmodifiableMap ( workerGroupNodes ) ;
}
@ -240,15 +320,15 @@ public class ZookeeperNodeManager implements InitializingBean {
* @param workerGroup workerGroup
* @return worker nodes
* /
public Set < String > getWorkerGroupNodes ( String workerGroup ) {
public Set < String > getWorkerGroupNodes ( String workerGroup ) {
workerGroupLock . lock ( ) ;
try {
if ( StringUtils . isEmpty ( workerGroup ) ) {
workerGroup = DEFAULT_WORKER_GROUP ;
if ( StringUtils . isEmpty ( workerGroup ) ) {
workerGroup = Constants . DEFAULT_WORKER_GROUP ;
}
workerGroup = workerGroup . toLowerCase ( ) ;
Set < String > nodes = workerGroupNodes . get ( workerGroup ) ;
if ( CollectionUtils . isNotEmpty ( nodes ) ) {
if ( CollectionUtils . isNotEmpty ( nodes ) ) {
return Collections . unmodifiableSet ( nodes ) ;
}
return nodes ;
@ -258,9 +338,48 @@ public class ZookeeperNodeManager implements InitializingBean {
}
/ * *
* close
* get worker node info
* @return worker node info
* /
public Map < String , String > getWorkerNodeInfo ( ) {
return Collections . unmodifiableMap ( workerNodeInfo ) ;
}
/ * *
* get worker node info
* @param workerNode worker node
* @return worker node info
* /
public String getWorkerNodeInfo ( String workerNode ) {
workerNodeInfoLock . lock ( ) ;
try {
return workerNodeInfo . getOrDefault ( workerNode , null ) ;
} finally {
workerNodeInfoLock . unlock ( ) ;
}
}
/ * *
* sync worker node info
* @param newWorkerNodeInfo new worker node info
* /
public void close ( ) {
private void syncWorkerNodeInfo ( Map < String , String > newWorkerNodeInfo ) {
workerNodeInfoLock . lock ( ) ;
try {
workerNodeInfo . clear ( ) ;
workerNodeInfo . putAll ( newWorkerNodeInfo ) ;
} finally {
workerNodeInfoLock . unlock ( ) ;
}
}
/ * *
* destroy
* /
@PreDestroy
public void destroy ( ) {
executorService . shutdownNow ( ) ;
registryCenter . close ( ) ;
}
}