@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package org.apache.dolphinscheduler.api.service ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent ;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor ;
import org.apache.dolphinscheduler.api.enums.Status ;
@ -29,20 +28,49 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag ;
import org.apache.dolphinscheduler.common.enums.ResourceType ;
import org.apache.dolphinscheduler.common.enums.UserType ;
import org.apache.dolphinscheduler.common.utils.* ;
import org.apache.dolphinscheduler.dao.entity.* ;
import org.apache.dolphinscheduler.dao.mapper.* ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils ;
import org.apache.dolphinscheduler.common.utils.HadoopUtils ;
import org.apache.dolphinscheduler.common.utils.PropertyUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.dolphinscheduler.dao.entity.AlertGroup ;
import org.apache.dolphinscheduler.dao.entity.DatasourceUser ;
import org.apache.dolphinscheduler.dao.entity.ProjectUser ;
import org.apache.dolphinscheduler.dao.entity.Resource ;
import org.apache.dolphinscheduler.dao.entity.ResourcesUser ;
import org.apache.dolphinscheduler.dao.entity.Tenant ;
import org.apache.dolphinscheduler.dao.entity.UDFUser ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper ;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper ;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper ;
import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper ;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper ;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper ;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper ;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper ;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils ;
import java.io.IOException ;
import java.text.MessageFormat ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.stream.Collectors ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.transaction.annotation.Transactional ;
import java.io.IOException ;
import java.text.MessageFormat ;
import java.util.* ;
import java.util.stream.Collectors ;
import com.baomidou.mybatisplus.core.metadata.IPage ;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page ;
/ * *
* user service
@ -109,7 +137,7 @@ public class UsersService extends BaseService {
String msg = this . checkUserParams ( userName , userPassword , email , phone ) ;
if ( ! StringUtils . isEmpty ( msg ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , msg ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , msg ) ;
return result ;
}
if ( ! isAdmin ( loginUser ) ) {
@ -126,12 +154,12 @@ public class UsersService extends BaseService {
Tenant tenant = tenantMapper . queryById ( tenantId ) ;
// resource upload startup
if ( PropertyUtils . getResUploadStartupState ( ) ) {
if ( PropertyUtils . getResUploadStartupState ( ) ) {
// if tenant not exists
if ( ! HadoopUtils . getInstance ( ) . exists ( HadoopUtils . getHdfsTenantDir ( tenant . getTenantCode ( ) ) ) ) {
if ( ! HadoopUtils . getInstance ( ) . exists ( HadoopUtils . getHdfsTenantDir ( tenant . getTenantCode ( ) ) ) ) {
createTenantDirIfNotExists ( tenant . getTenantCode ( ) ) ;
}
String userPath = HadoopUtils . getHdfsUserDir ( tenant . getTenantCode ( ) , user . getId ( ) ) ;
String userPath = HadoopUtils . getHdfsUserDir ( tenant . getTenantCode ( ) , user . getId ( ) ) ;
HadoopUtils . getInstance ( ) . mkdir ( userPath ) ;
}
@ -142,12 +170,12 @@ public class UsersService extends BaseService {
@Transactional ( rollbackFor = RuntimeException . class )
public User createUser ( String userName ,
String userPassword ,
String email ,
int tenantId ,
String phone ,
String queue ,
int state ) {
String userPassword ,
String email ,
int tenantId ,
String phone ,
String queue ,
int state ) {
User user = new User ( ) ;
Date now = new Date ( ) ;
@ -161,7 +189,7 @@ public class UsersService extends BaseService {
user . setUserType ( UserType . GENERAL_USER ) ;
user . setCreateTime ( now ) ;
user . setUpdateTime ( now ) ;
if ( StringUtils . isEmpty ( queue ) ) {
if ( StringUtils . isEmpty ( queue ) ) {
queue = "" ;
}
user . setQueue ( queue ) ;
@ -171,8 +199,40 @@ public class UsersService extends BaseService {
return user ;
}
/ * * *
* create User for ldap login
* /
@Transactional ( rollbackFor = Exception . class )
public User createUser ( UserType userType , String userId , String email ) {
User user = new User ( ) ;
Date now = new Date ( ) ;
user . setUserName ( userId ) ;
user . setEmail ( email ) ;
// create general users, administrator users are currently built-in
user . setUserType ( userType ) ;
user . setCreateTime ( now ) ;
user . setUpdateTime ( now ) ;
user . setQueue ( "" ) ;
// save user
userMapper . insert ( user ) ;
return user ;
}
/ * *
* get user by user name
*
* @param userName user name
* @return exist user or null
* /
public User getUserByUserName ( String userName ) {
return userMapper . queryByUserNameAccurately ( userName ) ;
}
/ * *
* query user by id
*
* @param id id
* @return user info
* /
@ -182,6 +242,7 @@ public class UsersService extends BaseService {
/ * *
* query user
*
* @param name name
* @return user info
* /
@ -203,6 +264,7 @@ public class UsersService extends BaseService {
/ * *
* get user id by user name
*
* @param name user name
* @return if name empty 0 , user not exists - 1 , user exist user id
* /
@ -242,7 +304,7 @@ public class UsersService extends BaseService {
IPage < User > scheduleList = userMapper . queryUserPaging ( page , searchVal ) ;
PageInfo < User > pageInfo = new PageInfo < > ( pageNo , pageSize ) ;
pageInfo . setTotalCount ( ( int ) scheduleList . getTotal ( ) ) ;
pageInfo . setTotalCount ( ( int ) scheduleList . getTotal ( ) ) ;
pageInfo . setLists ( scheduleList . getRecords ( ) ) ;
result . put ( Constants . DATA_LIST , pageInfo ) ;
putMsg ( result , Status . SUCCESS ) ;
@ -261,7 +323,7 @@ public class UsersService extends BaseService {
* @param email email
* @param tenantId tennat id
* @param phone phone
* @param queue queue
* @param queue queue
* @return update result code
* @throws Exception exception
* /
@ -286,8 +348,8 @@ public class UsersService extends BaseService {
}
if ( StringUtils . isNotEmpty ( userName ) ) {
if ( ! CheckUtils . checkUserName ( userName ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , userName ) ;
if ( ! CheckUtils . checkUserName ( userName ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , userName ) ;
return result ;
}
@ -300,23 +362,23 @@ public class UsersService extends BaseService {
}
if ( StringUtils . isNotEmpty ( userPassword ) ) {
if ( ! CheckUtils . checkPassword ( userPassword ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , userPassword ) ;
if ( ! CheckUtils . checkPassword ( userPassword ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , userPassword ) ;
return result ;
}
user . setUserPassword ( EncryptionUtils . getMd5 ( userPassword ) ) ;
}
if ( StringUtils . isNotEmpty ( email ) ) {
if ( ! CheckUtils . checkEmail ( email ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , email ) ;
if ( ! CheckUtils . checkEmail ( email ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , email ) ;
return result ;
}
user . setEmail ( email ) ;
}
if ( StringUtils . isNotEmpty ( phone ) & & ! CheckUtils . checkPhone ( phone ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , phone ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , phone ) ;
return result ;
}
user . setPhone ( phone ) ;
@ -332,13 +394,13 @@ public class UsersService extends BaseService {
Tenant newTenant = tenantMapper . queryById ( tenantId ) ;
if ( newTenant ! = null ) {
// if hdfs startup
if ( PropertyUtils . getResUploadStartupState ( ) & & oldTenant ! = null ) {
if ( PropertyUtils . getResUploadStartupState ( ) & & oldTenant ! = null ) {
String newTenantCode = newTenant . getTenantCode ( ) ;
String oldResourcePath = HadoopUtils . getHdfsResDir ( oldTenant . getTenantCode ( ) ) ;
String oldUdfsPath = HadoopUtils . getHdfsUdfDir ( oldTenant . getTenantCode ( ) ) ;
// if old tenant dir exists
if ( HadoopUtils . getInstance ( ) . exists ( oldResourcePath ) ) {
if ( HadoopUtils . getInstance ( ) . exists ( oldResourcePath ) ) {
String newResourcePath = HadoopUtils . getHdfsResDir ( newTenantCode ) ;
String newUdfsPath = HadoopUtils . getHdfsUdfDir ( newTenantCode ) ;
@ -361,18 +423,18 @@ public class UsersService extends BaseService {
}
//Delete the user from the old tenant directory
String oldUserPath = HadoopUtils . getHdfsUserDir ( oldTenant . getTenantCode ( ) , userId ) ;
String oldUserPath = HadoopUtils . getHdfsUserDir ( oldTenant . getTenantCode ( ) , userId ) ;
HadoopUtils . getInstance ( ) . delete ( oldUserPath , true ) ;
} else {
} else {
// if old tenant dir not exists , create
createTenantDirIfNotExists ( oldTenant . getTenantCode ( ) ) ;
}
if ( HadoopUtils . getInstance ( ) . exists ( HadoopUtils . getHdfsTenantDir ( newTenant . getTenantCode ( ) ) ) ) {
if ( HadoopUtils . getInstance ( ) . exists ( HadoopUtils . getHdfsTenantDir ( newTenant . getTenantCode ( ) ) ) ) {
//create user in the new tenant directory
String newUserPath = HadoopUtils . getHdfsUserDir ( newTenant . getTenantCode ( ) , user . getId ( ) ) ;
String newUserPath = HadoopUtils . getHdfsUserDir ( newTenant . getTenantCode ( ) , user . getId ( ) ) ;
HadoopUtils . getInstance ( ) . mkdir ( newUserPath ) ;
} else {
} else {
// if new tenant dir not exists , create
createTenantDirIfNotExists ( newTenant . getTenantCode ( ) ) ;
}
@ -414,7 +476,7 @@ public class UsersService extends BaseService {
if ( user ! = null ) {
if ( PropertyUtils . getResUploadStartupState ( ) ) {
String userPath = HadoopUtils . getHdfsUserDir ( user . getTenantCode ( ) , id ) ;
String userPath = HadoopUtils . getHdfsUserDir ( user . getTenantCode ( ) , id ) ;
if ( HadoopUtils . getInstance ( ) . exists ( userPath ) ) {
HadoopUtils . getInstance ( ) . delete ( userPath , true ) ;
}
@ -493,7 +555,7 @@ public class UsersService extends BaseService {
return result ;
}
User user = userMapper . selectById ( userId ) ;
if ( user = = null ) {
if ( user = = null ) {
putMsg ( result , Status . USER_NOT_EXIST , userId ) ;
return result ;
}
@ -504,7 +566,7 @@ public class UsersService extends BaseService {
// need authorize resource id set
for ( String resourceFullId : resourceFullIdArr ) {
String [ ] resourceIdArr = resourceFullId . split ( "-" ) ;
for ( int i = 0 ; i < = resourceIdArr . length - 1 ; i + + ) {
for ( int i = 0 ; i < = resourceIdArr . length - 1 ; i + + ) {
int resourceIdValue = Integer . parseInt ( resourceIdArr [ i ] ) ;
needAuthorizeResIds . add ( resourceIdValue ) ;
}
@ -531,7 +593,7 @@ public class UsersService extends BaseService {
if ( CollectionUtils . isNotEmpty ( resourceIdSet ) ) {
logger . error ( "can't be deleted,because it is used of process definition" ) ;
for ( Integer resId : resourceIdSet ) {
logger . error ( "resource id:{} is used of process definition {}" , resId , resourceProcessMap . get ( resId ) ) ;
logger . error ( "resource id:{} is used of process definition {}" , resId , resourceProcessMap . get ( resId ) ) ;
}
putMsg ( result , Status . RESOURCE_IS_USED ) ;
return result ;
@ -558,7 +620,7 @@ public class UsersService extends BaseService {
resourcesUser . setResourcesId ( resourceIdValue ) ;
if ( resource . isDirectory ( ) ) {
resourcesUser . setPerm ( Constants . AUTHORIZE_READABLE_PERM ) ;
} else {
} else {
resourcesUser . setPerm ( Constants . AUTHORIZE_WRITABLE_PERM ) ;
}
@ -591,7 +653,7 @@ public class UsersService extends BaseService {
return result ;
}
User user = userMapper . selectById ( userId ) ;
if ( user = = null ) {
if ( user = = null ) {
putMsg ( result , Status . USER_NOT_EXIST , userId ) ;
return result ;
}
@ -626,7 +688,7 @@ public class UsersService extends BaseService {
*
* @param loginUser login user
* @param userId user id
* @param datasourceIds data source id array
* @param datasourceIds data source id array
* @return grant result code
* /
@Transactional ( rollbackFor = RuntimeException . class )
@ -639,7 +701,7 @@ public class UsersService extends BaseService {
return result ;
}
User user = userMapper . selectById ( userId ) ;
if ( user = = null ) {
if ( user = = null ) {
putMsg ( result , Status . USER_NOT_EXIST , userId ) ;
return result ;
}
@ -738,7 +800,7 @@ public class UsersService extends BaseService {
return result ;
}
List < User > userList = userMapper . selectList ( null ) ;
List < User > userList = userMapper . selectList ( null ) ;
result . put ( Constants . DATA_LIST , userList ) ;
putMsg ( result , Status . SUCCESS ) ;
@ -782,7 +844,7 @@ public class UsersService extends BaseService {
return result ;
}
List < User > userList = userMapper . selectList ( null ) ;
List < User > userList = userMapper . selectList ( null ) ;
List < User > resultUsers = new ArrayList < > ( ) ;
Set < User > userSet = null ;
if ( userList ! = null & & userList . size ( ) > 0 ) {
@ -833,11 +895,6 @@ public class UsersService extends BaseService {
}
/ * *
*
* @param userName
* @param password
* @param email
* @param phone
* @return if check failed return the field , otherwise return null
* /
private String checkUserParams ( String userName , String password , String email , String phone ) {
@ -862,35 +919,36 @@ public class UsersService extends BaseService {
/ * *
* copy resource files
*
* @param resourceComponent resource component
* @param srcBasePath src base path
* @param dstBasePath dst base path
* @throws IOException io exception
* @param srcBasePath src base path
* @param dstBasePath dst base path
* @throws IOException io exception
* /
private void copyResourceFiles ( ResourceComponent resourceComponent , String srcBasePath , String dstBasePath ) throws IOException {
List < ResourceComponent > components = resourceComponent . getChildren ( ) ;
if ( CollectionUtils . isNotEmpty ( components ) ) {
for ( ResourceComponent component : components ) {
for ( ResourceComponent component : components ) {
// verify whether exist
if ( ! HadoopUtils . getInstance ( ) . exists ( String . format ( "%s/%s" , srcBasePath , component . getFullName ( ) ) ) ) {
logger . error ( "resource file: {} not exist,copy error" , component . getFullName ( ) ) ;
if ( ! HadoopUtils . getInstance ( ) . exists ( String . format ( "%s/%s" , srcBasePath , component . getFullName ( ) ) ) ) {
logger . error ( "resource file: {} not exist,copy error" , component . getFullName ( ) ) ;
throw new ServiceException ( Status . RESOURCE_NOT_EXIST ) ;
}
if ( ! component . isDirctory ( ) ) {
// copy it to dst
HadoopUtils . getInstance ( ) . copy ( String . format ( "%s/%s" , srcBasePath , component . getFullName ( ) ) , String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) , false , true ) ;
HadoopUtils . getInstance ( ) . copy ( String . format ( "%s/%s" , srcBasePath , component . getFullName ( ) ) , String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) , false , true ) ;
continue ;
}
if ( CollectionUtils . isEmpty ( component . getChildren ( ) ) ) {
if ( CollectionUtils . isEmpty ( component . getChildren ( ) ) ) {
// if not exist,need create it
if ( ! HadoopUtils . getInstance ( ) . exists ( String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) ) ) {
HadoopUtils . getInstance ( ) . mkdir ( String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) ) ;
if ( ! HadoopUtils . getInstance ( ) . exists ( String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) ) ) {
HadoopUtils . getInstance ( ) . mkdir ( String . format ( "%s/%s" , dstBasePath , component . getFullName ( ) ) ) ;
}
} else {
copyResourceFiles ( component , srcBasePath , dstBasePath ) ;
} else {
copyResourceFiles ( component , srcBasePath , dstBasePath ) ;
}
}
}
@ -899,10 +957,10 @@ public class UsersService extends BaseService {
/ * *
* register user , default state is 0 , default tenant_id is 1 , no phone , no queue
*
* @param userName user name
* @param userPassword user password
* @param userName user name
* @param userPassword user password
* @param repeatPassword repeat password
* @param email email
* @param email email
* @return register result code
* @throws Exception exception
* /
@ -914,7 +972,7 @@ public class UsersService extends BaseService {
String msg = this . checkUserParams ( userName , userPassword , email , "" ) ;
if ( ! StringUtils . isEmpty ( msg ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , msg ) ;
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , msg ) ;
return result ;
}
@ -932,7 +990,7 @@ public class UsersService extends BaseService {
* activate user , only system admin have permission , change user state code 0 to 1
*
* @param loginUser login user
* @param userName user name
* @param userName user name
* @return create result code
* /
public Map < String , Object > activateUser ( User loginUser , String userName ) {
@ -944,7 +1002,7 @@ public class UsersService extends BaseService {
return result ;
}
if ( ! CheckUtils . checkUserName ( userName ) ) {
if ( ! CheckUtils . checkUserName ( userName ) ) {
putMsg ( result , Status . REQUEST_PARAMS_NOT_VALID_ERROR , userName ) ;
return result ;
}