@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl ;
package org.apache.dolphinscheduler.api.service.impl ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.YARN_QUEUE_CREATE ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.YARN_QUEUE_UPDATE ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.enums.Status ;
import org.apache.dolphinscheduler.api.exceptions.ServiceException ;
import org.apache.dolphinscheduler.api.exceptions.ServiceException ;
import org.apache.dolphinscheduler.api.service.QueueService ;
import org.apache.dolphinscheduler.api.service.QueueService ;
@ -33,9 +33,10 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper ;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper ;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.commons.lang3.StringUtils ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.Collections ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.HashSet ;
import java.util.List ;
import java.util.List ;
@ -49,8 +50,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service ;
import org.springframework.stereotype.Service ;
import org.springframework.transaction.annotation.Transactional ;
import org.springframework.transaction.annotation.Transactional ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.YARN_QUEUE_CREATE ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.YARN_QUEUE_UPDATE ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
/ * *
/ * *
* queue service impl
* queue service impl
@ -67,41 +68,45 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
private UserMapper userMapper ;
private UserMapper userMapper ;
/ * *
/ * *
* Valid both queue and queueName when we want to create or update queue objec t
* Check the queue new object valid or n ot
*
*
* @param queue queue value
* @param queue The queue object want to create
* @param queueName queue name
* /
* /
private void queueValid ( String queue , String queueNam e) throws ServiceException {
private void createQueueValid ( Queue queu e) throws ServiceException {
if ( StringUtils . isEmpty ( queue ) ) {
if ( StringUtils . isEmpty ( queue . getQueue ( ) ) ) {
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE ) ;
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE ) ;
} else if ( StringUtils . isEmpty ( queueName ) ) {
} else if ( StringUtils . isEmpty ( queue . getQueue Name( ) ) ) {
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE_NAME ) ;
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE_NAME ) ;
} else if ( checkQueueExist ( queue ) ) {
} else if ( checkQueueExist ( queue . getQueue ( ) ) ) {
throw new ServiceException ( Status . QUEUE_VALUE_EXIST , queue ) ;
throw new ServiceException ( Status . QUEUE_VALUE_EXIST , queue . getQueue ( ) ) ;
} else if ( checkQueueNameExist ( queueName ) ) {
} else if ( checkQueueNameExist ( queue . getQueue Name( ) ) ) {
throw new ServiceException ( Status . QUEUE_NAME_EXIST , queueName ) ;
throw new ServiceException ( Status . QUEUE_NAME_EXIST , queue . getQueue Name( ) ) ;
}
}
}
}
/ * *
/ * *
* Insert one single new Queue record to database
* Check queue update object valid or not
*
*
* @param queue queue value
* @param existsQueue The exists queue object
* @param queueName queue name
* @param updateQueue The queue object want to update
* @return Queue
* /
* /
private Queue createObjToDB ( String queue , String queueName ) {
private void updateQueueValid ( Queue existsQueue , Queue updateQueue ) throws ServiceException {
Queue queueObj = new Queue ( ) ;
// Check the exists queue and the necessary of update operation, in not exist checker have to use updateQueue to avoid NPE
Date now = new Date ( ) ;
if ( Objects . isNull ( existsQueue ) ) {
throw new ServiceException ( Status . QUEUE_NOT_EXIST , updateQueue . getQueue ( ) ) ;
queueObj . setQueue ( queue ) ;
} else if ( Objects . equals ( existsQueue , updateQueue ) ) {
queueObj . setQueueName ( queueName ) ;
throw new ServiceException ( Status . NEED_NOT_UPDATE_QUEUE ) ;
queueObj . setCreateTime ( now ) ;
}
queueObj . setUpdateTime ( now ) ;
// Check the update queue parameters
// save
else if ( StringUtils . isEmpty ( updateQueue . getQueue ( ) ) ) {
queueMapper . insert ( queueObj ) ;
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE ) ;
return queueObj ;
} else if ( StringUtils . isEmpty ( updateQueue . getQueueName ( ) ) ) {
throw new ServiceException ( Status . REQUEST_PARAMS_NOT_VALID_ERROR , Constants . QUEUE_NAME ) ;
} else if ( ! Objects . equals ( updateQueue . getQueue ( ) , existsQueue . getQueue ( ) ) & & checkQueueExist ( updateQueue . getQueue ( ) ) ) {
throw new ServiceException ( Status . QUEUE_VALUE_EXIST , updateQueue . getQueue ( ) ) ;
} else if ( ! Objects . equals ( updateQueue . getQueueName ( ) , existsQueue . getQueueName ( ) ) & & checkQueueNameExist ( updateQueue . getQueueName ( ) ) ) {
throw new ServiceException ( Status . QUEUE_NAME_EXIST , updateQueue . getQueueName ( ) ) ;
}
}
}
/ * *
/ * *
@ -169,11 +174,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
if ( isNotAdmin ( loginUser , result ) ) {
if ( isNotAdmin ( loginUser , result ) ) {
return result ;
return result ;
}
}
queueValid ( queue , queueName ) ;
Queue newQueue = createObjToDB ( queue , queueName ) ;
Queue queueObj = new Queue ( queueName , queue ) ;
result . put ( Constants . DATA_LIST , newQueue ) ;
createQueueValid ( queueObj ) ;
queueMapper . insert ( queueObj ) ;
result . put ( Constants . DATA_LIST , queueObj ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
permissionPostHandle ( AuthorizationType . QUEUE , loginUser . getId ( ) , Collections . singletonList ( queueObj . getId ( ) ) , logger ) ;
return result ;
return result ;
}
}
@ -193,35 +201,20 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
return result ;
return result ;
}
}
queueValid ( queue , queueName ) ;
Queue updateQueue = new Queue ( id , queueName , queue ) ;
Queue existsQueue = queueMapper . selectById ( id ) ;
Queue queueObj = queueMapper . selectById ( id ) ;
updateQueueValid ( existsQueue , updateQueue ) ;
if ( Objects . isNull ( queueObj ) ) {
throw new ServiceException ( Status . QUEUE_NOT_EXIST , queue ) ;
}
// whether queue value or queueName is changed
if ( queue . equals ( queueObj . getQueue ( ) ) & & queueName . equals ( queueObj . getQueueName ( ) ) ) {
throw new ServiceException ( Status . NEED_NOT_UPDATE_QUEUE ) ;
}
// check old queue using by any user
// check old queue using by any user
if ( checkIfQueueIsInUsing ( queueObj . getQueueName ( ) , queueName ) ) {
if ( checkIfQueueIsInUsing ( existsQueue . getQueueName ( ) , updateQueue . getQueueName ( ) ) ) {
//update user related old queue
//update user related old queue
Integer relatedUserNums = userMapper . updateUserQueue ( queueObj . getQueueName ( ) , queueName ) ;
Integer relatedUserNums = userMapper . updateUserQueue ( existsQueue . getQueueName ( ) , updateQueue . getQueueName ( ) ) ;
logger . info ( "old queue have related {} user, exec update user success." , relatedUserNums ) ;
logger . info ( "old queue have related {} user, exec update user success." , relatedUserNums ) ;
}
}
// update queue
queueMapper . updateById ( updateQueue ) ;
Date now = new Date ( ) ;
queueObj . setQueue ( queue ) ;
queueObj . setQueueName ( queueName ) ;
queueObj . setUpdateTime ( now ) ;
queueMapper . updateById ( queueObj ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
return result ;
}
}
@ -235,7 +228,9 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
@Override
@Override
public Result < Object > verifyQueue ( String queue , String queueName ) {
public Result < Object > verifyQueue ( String queue , String queueName ) {
Result < Object > result = new Result < > ( ) ;
Result < Object > result = new Result < > ( ) ;
queueValid ( queue , queueName ) ;
Queue queueValidator = new Queue ( queueName , queue ) ;
createQueueValid ( queueValidator ) ;
putMsg ( result , Status . SUCCESS ) ;
putMsg ( result , Status . SUCCESS ) ;
return result ;
return result ;
@ -287,10 +282,12 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
* /
* /
@Override
@Override
public Queue createQueueIfNotExists ( String queue , String queueName ) {
public Queue createQueueIfNotExists ( String queue , String queueName ) {
queueValid ( queue , queueName ) ;
Queue queueObj = new Queue ( queueName , queue ) ;
createQueueValid ( queueObj ) ;
Queue existsQueue = queueMapper . queryQueueName ( queue , queueName ) ;
Queue existsQueue = queueMapper . queryQueueName ( queue , queueName ) ;
if ( Objects . isNull ( existsQueue ) ) {
if ( Objects . isNull ( existsQueue ) ) {
return createObjToDB ( queue , queueName ) ;
queueMapper . insert ( queueObj ) ;
return queueObj ;
}
}
return existsQueue ;
return existsQueue ;
}
}