@ -17,34 +17,34 @@
package org.apache.dolphinscheduler.api.service.impl ;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP ;
import static org.apache.dolphinscheduler.common.Constants.SLASH ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.service.WorkerGroupService ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ZKNodeType ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup ;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper ;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.stream.Collectors ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.transaction.annotation.Transactional ;
/ * *
* worker group service impl
@ -54,12 +54,115 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
private static final Logger logger = LoggerFactory . getLogger ( WorkerGroupServiceImpl . class ) ;
@Autowired
WorkerGroupMapper workerGroupMapper ;
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator ;
@Autowired
private ZookeeperMonitor zookeeperMonitor ;
@Autowired
ProcessInstanceMapper processInstanceMapper ;
/ * *
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param addrList addr list
* @return create or update result code
* /
@Override
public Map < String , Object > saveWorkerGroup ( User loginUser , int id , String name , String addrList ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( isNotAdmin ( loginUser , result ) ) {
return result ;
}
if ( Constants . DOCKER_MODE & & ! Constants . KUBERNETES_MODE ) {
putMsg ( result , Status . CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER ) ;
return result ;
}
if ( StringUtils . isEmpty ( name ) ) {
putMsg ( result , Status . NAME_NULL ) ;
return result ;
}
Date now = new Date ( ) ;
WorkerGroup workerGroup ;
if ( id ! = 0 ) {
workerGroup = workerGroupMapper . selectById ( id ) ;
// check exist
if ( workerGroup = = null ) {
workerGroup = new WorkerGroup ( ) ;
workerGroup . setCreateTime ( now ) ;
}
} else {
workerGroup = new WorkerGroup ( ) ;
workerGroup . setCreateTime ( now ) ;
}
workerGroup . setName ( name ) ;
workerGroup . setAddrList ( addrList ) ;
workerGroup . setUpdateTime ( now ) ;
if ( checkWorkerGroupNameExists ( workerGroup ) ) {
putMsg ( result , Status . NAME_EXIST , workerGroup . getName ( ) ) ;
return result ;
}
String invalidAddr = checkWorkerGroupAddrList ( workerGroup ) ;
if ( invalidAddr ! = null ) {
putMsg ( result , Status . WORKER_ADDRESS_INVALID , invalidAddr ) ;
return result ;
}
if ( workerGroup . getId ( ) ! = 0 ) {
workerGroupMapper . updateById ( workerGroup ) ;
} else {
workerGroupMapper . insert ( workerGroup ) ;
}
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* check worker group name exists
* @param workerGroup worker group
* @return boolean
* /
private boolean checkWorkerGroupNameExists ( WorkerGroup workerGroup ) {
List < WorkerGroup > workerGroupList = workerGroupMapper . queryWorkerGroupByName ( workerGroup . getName ( ) ) ;
if ( CollectionUtils . isNotEmpty ( workerGroupList ) ) {
// new group has same name
if ( workerGroup . getId ( ) = = 0 ) {
return true ;
}
// check group id
for ( WorkerGroup group : workerGroupList ) {
if ( group . getId ( ) ! = workerGroup . getId ( ) ) {
return true ;
}
}
}
// check zookeeper
String workerGroupPath = zookeeperCachedOperator . getZookeeperConfig ( ) . getDsRoot ( ) + Constants . ZOOKEEPER_DOLPHINSCHEDULER_WORKERS + Constants . SLASH + workerGroup . getName ( ) ;
return zookeeperCachedOperator . isExisted ( workerGroupPath ) ;
}
/ * *
* check worker group addr list
* @param workerGroup worker group
* @return boolean
* /
private String checkWorkerGroupAddrList ( WorkerGroup workerGroup ) {
Map < String , String > serverMaps = zookeeperMonitor . getServerMaps ( ZKNodeType . WORKER , true ) ;
for ( String addr : workerGroup . getAddrList ( ) . split ( Constants . COMMA ) ) {
if ( ! serverMaps . containsKey ( addr ) ) {
return addr ;
}
}
return null ;
}
/ * *
* query worker group paging
*
@ -82,7 +185,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
List < WorkerGroup > workerGroups = getWorkerGroups ( true ) ;
List < WorkerGroup > resultDataList = new ArrayList < > ( ) ;
if ( CollectionUtils . isNotEmpty ( workerGroups ) ) {
@ -98,10 +200,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
searchValDataList = workerGroups ;
}
if ( searchValDataList . size ( ) < pageSize ) {
toIndex = ( pageNo - 1 ) * pageSize + searchValDataList . size ( ) ;
if ( fromIndex < searchValDataList . size ( ) ) {
if ( toIndex > searchValDataList . size ( ) ) {
toIndex = searchValDataList . size ( ) ;
}
resultDataList = searchValDataList . subList ( fromIndex , toIndex ) ;
}
resultDataList = searchValDataList . subList ( fromIndex , toIndex ) ;
}
PageInfo < WorkerGroup > pageInfo = new PageInfo < > ( pageNo , pageSize ) ;
@ -121,13 +225,16 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map < String , Object > queryAllGroup ( ) {
Map < String , Object > result = new HashMap < > ( ) ;
List < WorkerGroup > workerGroups = getWorkerGroups ( false ) ;
Set < String > availableWorkerGroupSet = workerGroups . stream ( )
List < String > availableWorkerGroupList = workerGroups . stream ( )
. map ( WorkerGroup : : getName )
. collect ( Collectors . toSet ( ) ) ;
result . put ( Constants . DATA_LIST , availableWorkerGroupSet ) ;
. collect ( Collectors . toList ( ) ) ;
int index = availableWorkerGroupList . indexOf ( Constants . DEFAULT_WORKER_GROUP ) ;
if ( index > - 1 ) {
availableWorkerGroupList . remove ( index ) ;
availableWorkerGroupList . add ( 0 , Constants . DEFAULT_WORKER_GROUP ) ;
}
result . put ( Constants . DATA_LIST , availableWorkerGroupList ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
@ -139,8 +246,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return WorkerGroup list
* /
private List < WorkerGroup > getWorkerGroups ( boolean isPaging ) {
// worker groups from database
List < WorkerGroup > workerGroups = workerGroupMapper . queryAllWorkerGroup ( ) ;
// worker groups from zookeeper
String workerPath = zookeeperCachedOperator . getZookeeperConfig ( ) . getDsRoot ( ) + Constants . ZOOKEEPER_DOLPHINSCHEDULER_WORKERS ;
List < WorkerGroup > workerGroups = new ArrayList < > ( ) ;
List < String > workerGroupList = null ;
try {
workerGroupList = zookeeperCachedOperator . getChildrenKeys ( workerPath ) ;
@ -148,32 +257,70 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
logger . error ( "getWorkerGroups exception: {}, workerPath: {}, isPaging: {}" , e . getMessage ( ) , workerPath , isPaging ) ;
}
if ( workerGroupList = = null | | workerGroupList . isEmpty ( ) ) {
if ( ! isPaging ) {
if ( CollectionUtils . isEmpty ( workerGroupList ) ) {
if ( CollectionUtils . isEmpty ( workerGroups ) & & ! isPaging ) {
WorkerGroup wg = new WorkerGroup ( ) ;
wg . setName ( DEFAULT_WORKER_GROUP ) ;
wg . setName ( Constants . DEFAULT_WORKER_GROUP ) ;
workerGroups . add ( wg ) ;
}
return workerGroups ;
}
for ( String workerGroup : workerGroupList ) {
String workerGroupPath = workerPath + SLASH + workerGroup ;
List < String > childrenNodes = zookeeperCachedOperator . getChildrenKeys ( workerGroupPath ) ;
if ( CollectionUtils . isEmpty ( childrenNodes ) ) {
String workerGroupPath = workerPath + Constants . SLASH + workerGroup ;
List < String > childrenNodes = null ;
try {
childrenNodes = zookeeperCachedOperator . getChildrenKeys ( workerGroupPath ) ;
} catch ( Exception e ) {
logger . error ( "getChildrenNodes exception: {}, workerGroupPath: {}" , e . getMessage ( ) , workerGroupPath ) ;
}
if ( childrenNodes = = null | | childrenNodes . isEmpty ( ) ) {
continue ;
}
WorkerGroup wg = new WorkerGroup ( ) ;
wg . setName ( workerGroup ) ;
if ( isPaging ) {
wg . setIpList ( childrenNodes . stream ( ) . map ( node - > Host . of ( node ) . getIp ( ) ) . collect ( Collectors . toList ( ) ) ) ;
String registeredValue = zookeeperCachedOperator . get ( workerGroupPath + SLASH + childrenNodes . get ( 0 ) ) ;
wg . setCreateTime ( DateUtils . stringToDate ( registeredValue . split ( "," ) [ 6 ] ) ) ;
wg . setUpdateTime ( DateUtils . stringToDate ( registeredValue . split ( "," ) [ 7 ] ) ) ;
wg . setAddrList ( String . join ( Constants . COMMA , childrenNodes ) ) ;
String registeredValue = zookeeperCachedOperator . get ( workerGroupPath + Constants . SLASH + childrenNodes . get ( 0 ) ) ;
wg . setCreateTime ( DateUtils . stringToDate ( registeredValue . split ( Constants . COMMA ) [ 6 ] ) ) ;
wg . setUpdateTime ( DateUtils . stringToDate ( registeredValue . split ( Constants . COMMA ) [ 7 ] ) ) ;
wg . setSystemDefault ( true ) ;
}
workerGroups . add ( wg ) ;
}
return workerGroups ;
}
/ * *
* delete worker group by id
* @param id worker group id
* @return delete result code
* /
@Override
@Transactional ( rollbackFor = Exception . class )
public Map < String , Object > deleteWorkerGroupById ( User loginUser , Integer id ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( isNotAdmin ( loginUser , result ) ) {
return result ;
}
if ( Constants . DOCKER_MODE & & ! Constants . KUBERNETES_MODE ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER ) ;
return result ;
}
WorkerGroup workerGroup = workerGroupMapper . selectById ( id ) ;
if ( workerGroup = = null ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_NOT_EXIST ) ;
return result ;
}
List < ProcessInstance > processInstances = processInstanceMapper . queryByWorkerGroupNameAndStatus ( workerGroup . getName ( ) , Constants . NOT_TERMINATED_STATES ) ;
if ( CollectionUtils . isNotEmpty ( processInstances ) ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_BY_ID_FAIL , processInstances . size ( ) ) ;
return result ;
}
workerGroupMapper . deleteById ( id ) ;
processInstanceMapper . updateProcessInstanceByWorkerGroupName ( workerGroup . getName ( ) , "" ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
}