@ -18,25 +18,31 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.PageInfo ;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ZKNodeType ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.DateUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup ;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup ;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper ;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper ;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator ;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.HashMap ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.Set ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.stereotype.Service ;
import org.springframework.transaction.annotation.Transactional ;
/ * *
/ * *
* work group service
* work group service
@ -44,14 +50,115 @@ import org.springframework.stereotype.Service;
@Service
@Service
public class WorkerGroupService extends BaseService {
public class WorkerGroupService extends BaseService {
private static final Logger logger = LoggerFactory . getLogger ( WorkerGroupService . class ) ;
@Autowired
@Autowired
ProcessInstanceMapper processInstance Mapper;
WorkerGroupMapper workerGroup Mapper;
@Autowired
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator ;
protected ZookeeperCachedOperator zookeeperCachedOperator ;
@Autowired
private ZookeeperMonitor zookeeperMonitor ;
@Autowired
ProcessInstanceMapper processInstanceMapper ;
/ * *
* create or update a worker group
*
* @param loginUser login user
* @param id worker group id
* @param name worker group name
* @param addrList addr list
* @return create or update result code
* /
public Map < String , Object > saveWorkerGroup ( User loginUser , int id , String name , String addrList ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( checkAdmin ( loginUser , result ) ) {
return result ;
}
if ( Constants . DOCKER_MODE & & ! Constants . KUBERNETES_MODE ) {
putMsg ( result , Status . CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER ) ;
return result ;
}
if ( StringUtils . isEmpty ( name ) ) {
putMsg ( result , Status . NAME_NULL ) ;
return result ;
}
Date now = new Date ( ) ;
WorkerGroup workerGroup ;
if ( id ! = 0 ) {
workerGroup = workerGroupMapper . selectById ( id ) ;
// check exist
if ( workerGroup = = null ) {
workerGroup = new WorkerGroup ( ) ;
workerGroup . setCreateTime ( now ) ;
}
} else {
workerGroup = new WorkerGroup ( ) ;
workerGroup . setCreateTime ( now ) ;
}
workerGroup . setName ( name ) ;
workerGroup . setAddrList ( addrList ) ;
workerGroup . setUpdateTime ( now ) ;
if ( checkWorkerGroupNameExists ( workerGroup ) ) {
putMsg ( result , Status . NAME_EXIST , workerGroup . getName ( ) ) ;
return result ;
}
String invalidAddr = checkWorkerGroupAddrList ( workerGroup ) ;
if ( invalidAddr ! = null ) {
putMsg ( result , Status . WORKER_ADDRESS_INVALID , invalidAddr ) ;
return result ;
}
if ( workerGroup . getId ( ) ! = 0 ) {
workerGroupMapper . updateById ( workerGroup ) ;
} else {
workerGroupMapper . insert ( workerGroup ) ;
}
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* check worker group name exists
* @param workerGroup worker group
* @return boolean
* /
private boolean checkWorkerGroupNameExists ( WorkerGroup workerGroup ) {
List < WorkerGroup > workerGroupList = workerGroupMapper . queryWorkerGroupByName ( workerGroup . getName ( ) ) ;
if ( CollectionUtils . isNotEmpty ( workerGroupList ) ) {
// new group has same name
if ( workerGroup . getId ( ) = = 0 ) {
return true ;
}
// check group id
for ( WorkerGroup group : workerGroupList ) {
if ( group . getId ( ) ! = workerGroup . getId ( ) ) {
return true ;
}
}
}
// check zookeeper
String workerGroupPath = zookeeperCachedOperator . getZookeeperConfig ( ) . getDsRoot ( ) + Constants . ZOOKEEPER_DOLPHINSCHEDULER_WORKERS + Constants . SLASH + workerGroup . getName ( ) ;
return zookeeperCachedOperator . isExisted ( workerGroupPath ) ;
}
/ * *
* check worker group addr list
* @param workerGroup worker group
* @return boolean
* /
private String checkWorkerGroupAddrList ( WorkerGroup workerGroup ) {
Map < String , String > serverMaps = zookeeperMonitor . getServerMaps ( ZKNodeType . WORKER , true ) ;
for ( String addr : workerGroup . getAddrList ( ) . split ( Constants . COMMA ) ) {
if ( ! serverMaps . containsKey ( addr ) ) {
return addr ;
}
}
return null ;
}
/ * *
/ * *
* query worker group paging
* query worker group paging
@ -63,19 +170,17 @@ public class WorkerGroupService extends BaseService {
* @return worker group list page
* @return worker group list page
* /
* /
public Map < String , Object > queryAllGroupPaging ( User loginUser , Integer pageNo , Integer pageSize , String searchVal ) {
public Map < String , Object > queryAllGroupPaging ( User loginUser , Integer pageNo , Integer pageSize , String searchVal ) {
// list from index
// list from index
Integer fromIndex = ( pageNo - 1 ) * pageSize ;
int fromIndex = ( pageNo - 1 ) * pageSize ;
// list to index
// list to index
Integer toIndex = ( pageNo - 1 ) * pageSize + pageSize ;
int toIndex = ( pageNo - 1 ) * pageSize + pageSize ;
Map < String , Object > result = new HashMap < > ( 5 ) ;
Map < String , Object > result = new HashMap < > ( ) ;
if ( checkAdmin ( loginUser , result ) ) {
if ( checkAdmin ( loginUser , result ) ) {
return result ;
return result ;
}
}
List < WorkerGroup > workerGroups = getWorkerGroups ( true ) ;
List < WorkerGroup > workerGroups = getWorkerGroups ( true ) ;
List < WorkerGroup > resultDataList = new ArrayList < > ( ) ;
List < WorkerGroup > resultDataList = new ArrayList < > ( ) ;
if ( CollectionUtils . isNotEmpty ( workerGroups ) ) {
if ( CollectionUtils . isNotEmpty ( workerGroups ) ) {
@ -91,11 +196,13 @@ public class WorkerGroupService extends BaseService {
searchValDataList = workerGroups ;
searchValDataList = workerGroups ;
}
}
if ( searchValDataList . size ( ) < pageSize ) {
if ( fromIndex < searchValDataList . size ( ) ) {
toIndex = ( pageNo - 1 ) * pageSize + searchValDataList . size ( ) ;
if ( toIndex > searchValDataList . size ( ) ) {
toIndex = searchValDataList . size ( ) ;
}
}
resultDataList = searchValDataList . subList ( fromIndex , toIndex ) ;
resultDataList = searchValDataList . subList ( fromIndex , toIndex ) ;
}
}
}
PageInfo < WorkerGroup > pageInfo = new PageInfo < > ( pageNo , pageSize ) ;
PageInfo < WorkerGroup > pageInfo = new PageInfo < > ( pageNo , pageSize ) ;
pageInfo . setTotalCount ( resultDataList . size ( ) ) ;
pageInfo . setTotalCount ( resultDataList . size ( ) ) ;
@ -106,8 +213,6 @@ public class WorkerGroupService extends BaseService {
return result ;
return result ;
}
}
/ * *
/ * *
* query all worker group
* query all worker group
*
*
@ -115,18 +220,20 @@ public class WorkerGroupService extends BaseService {
* /
* /
public Map < String , Object > queryAllGroup ( ) {
public Map < String , Object > queryAllGroup ( ) {
Map < String , Object > result = new HashMap < > ( ) ;
Map < String , Object > result = new HashMap < > ( ) ;
List < WorkerGroup > workerGroups = getWorkerGroups ( false ) ;
List < WorkerGroup > workerGroups = getWorkerGroups ( false ) ;
List < String > availableWorkerGroupList = workerGroups . stream ( )
Set < String > availableWorkerGroupSet = workerGroups . stream ( )
. map ( WorkerGroup : : getName )
. map ( workerGroup - > workerGroup . getName ( ) )
. collect ( Collectors . toList ( ) ) ;
. collect ( Collectors . toSet ( ) ) ;
int index = availableWorkerGroupList . indexOf ( Constants . DEFAULT_WORKER_GROUP ) ;
result . put ( Constants . DATA_LIST , availableWorkerGroupSet ) ;
if ( index > - 1 ) {
availableWorkerGroupList . remove ( index ) ;
availableWorkerGroupList . add ( 0 , Constants . DEFAULT_WORKER_GROUP ) ;
}
result . put ( Constants . DATA_LIST , availableWorkerGroupList ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
return result ;
}
}
/ * *
/ * *
* get worker groups
* get worker groups
*
*
@ -134,29 +241,93 @@ public class WorkerGroupService extends BaseService {
* @return WorkerGroup list
* @return WorkerGroup list
* /
* /
private List < WorkerGroup > getWorkerGroups ( boolean isPaging ) {
private List < WorkerGroup > getWorkerGroups ( boolean isPaging ) {
String workerPath = zookeeperCachedOperator . getZookeeperConfig ( ) . getDsRoot ( ) + "/nodes" + "/worker" ;
// worker groups from database
List < String > workerGroupList = zookeeperCachedOperator . getChildrenKeys ( workerPath ) ;
List < WorkerGroup > workerGroups = workerGroupMapper . queryAllWorkerGroup ( ) ;
// worker groups from zookeeper
String workerPath = zookeeperCachedOperator . getZookeeperConfig ( ) . getDsRoot ( ) + Constants . ZOOKEEPER_DOLPHINSCHEDULER_WORKERS ;
List < String > workerGroupList = null ;
try {
workerGroupList = zookeeperCachedOperator . getChildrenKeys ( workerPath ) ;
} catch ( Exception e ) {
logger . error ( "getWorkerGroups exception: {}, workerPath: {}, isPaging: {}" , e . getMessage ( ) , workerPath , isPaging ) ;
}
// available workerGroup list
if ( CollectionUtils . isEmpty ( workerGroupList ) ) {
List < String > availableWorkerGroupList = new ArrayList < > ( ) ;
if ( CollectionUtils . isEmpty ( workerGroups ) & & ! isPaging ) {
List < WorkerGroup > workerGroups = new ArrayList < > ( ) ;
WorkerGroup wg = new WorkerGroup ( ) ;
wg . setName ( Constants . DEFAULT_WORKER_GROUP ) ;
workerGroups . add ( wg ) ;
}
return workerGroups ;
}
for ( String workerGroup : workerGroupList ) {
for ( String workerGroup : workerGroupList ) {
String workerGroupPath = workerPath + "/" + workerGroup ;
String workerGroupPath = workerPath + Constants . SLASH + workerGroup ;
List < String > childrenNodes = zookeeperCachedOperator . getChildrenKeys ( workerGroupPath ) ;
List < String > childrenNodes = null ;
if ( CollectionUtils . isNotEmpty ( childrenNodes ) ) {
try {
availableWorkerGroupList . add ( workerGroup ) ;
childrenNodes = zookeeperCachedOperator . getChildrenKeys ( workerGroupPath ) ;
} catch ( Exception e ) {
logger . error ( "getChildrenNodes exception: {}, workerGroupPath: {}" , e . getMessage ( ) , workerGroupPath ) ;
}
if ( childrenNodes = = null | | childrenNodes . isEmpty ( ) ) {
continue ;
}
WorkerGroup wg = new WorkerGroup ( ) ;
WorkerGroup wg = new WorkerGroup ( ) ;
wg . setName ( workerGroup ) ;
wg . setName ( workerGroup ) ;
if ( isPaging ) {
if ( isPaging ) {
wg . setIpList ( childrenNodes . stream ( ) . map ( node - > Host . of ( node ) . getIp ( ) ) . collect ( Collectors . toList ( ) ) ) ;
wg . setAddrList ( String . join ( Constants . COMMA , childrenNodes ) ) ;
String registeredValue = zookeeperCachedOperator . get ( workerGroupPath + "/" + childrenNodes . get ( 0 ) ) ;
String registeredValue = zookeeperCachedOperator . get ( workerGroupPath + Constants . SLASH + childrenNodes . get ( 0 ) ) ;
wg . setCreateTime ( DateUtils . stringToDate ( registeredValue . split ( "," ) [ 6 ] ) ) ;
wg . setCreateTime ( DateUtils . stringToDate ( registeredValue . split ( Constants . COMMA ) [ 6 ] ) ) ;
wg . setUpdateTime ( DateUtils . stringToDate ( registeredValue . split ( "," ) [ 7 ] ) ) ;
wg . setUpdateTime ( DateUtils . stringToDate ( registeredValue . split ( Constants . COMMA ) [ 7 ] ) ) ;
wg . setSystemDefault ( true ) ;
}
}
workerGroups . add ( wg ) ;
workerGroups . add ( wg ) ;
}
}
}
return workerGroups ;
return workerGroups ;
}
}
/ * *
* delete worker group by id
* @param id worker group id
* @return delete result code
* /
@Transactional ( rollbackFor = Exception . class )
public Map < String , Object > deleteWorkerGroupById ( User loginUser , Integer id ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( checkAdmin ( loginUser , result ) ) {
return result ;
}
if ( Constants . DOCKER_MODE & & ! Constants . KUBERNETES_MODE ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER ) ;
return result ;
}
WorkerGroup workerGroup = workerGroupMapper . selectById ( id ) ;
if ( workerGroup = = null ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_NOT_EXIST ) ;
return result ;
}
List < ProcessInstance > processInstances = processInstanceMapper . queryByWorkerGroupNameAndStatus ( workerGroup . getName ( ) , Constants . NOT_TERMINATED_STATES ) ;
if ( CollectionUtils . isNotEmpty ( processInstances ) ) {
putMsg ( result , Status . DELETE_WORKER_GROUP_BY_ID_FAIL , processInstances . size ( ) ) ;
return result ;
}
workerGroupMapper . deleteById ( id ) ;
processInstanceMapper . updateProcessInstanceByWorkerGroupName ( workerGroup . getName ( ) , "" ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
/ * *
* query all worker address list
*
* @return all worker address list
* /
public Map < String , Object > getWorkerAddressList ( ) {
Map < String , Object > result = new HashMap < > ( ) ;
List < String > serverNodeList = zookeeperMonitor . getServerNodeList ( ZKNodeType . WORKER , true ) ;
result . put ( Constants . DATA_LIST , serverNodeList ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
}
}
}