From 9ae29a756f0aeed894c80f5e495d786ccf03f41f Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 18 Feb 2021 23:17:01 +0800 Subject: [PATCH] [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766) * [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code --- .../api/service/ResourcesService.java | 1200 +-------------- .../api/service/SchedulerService.java | 509 +------ .../api/service/TaskInstanceService.java | 154 +- .../service/impl/AccessTokenServiceImpl.java | 10 +- .../service/impl/DataAnalysisServiceImpl.java | 6 +- .../impl/ProcessDefinitionServiceImpl.java | 19 +- .../service/impl/ResourcesServiceImpl.java | 1313 +++++++++++++++++ .../service/impl/SchedulerServiceImpl.java | 600 ++++++++ .../service/impl/TaskInstanceServiceImpl.java | 208 +++ .../api/service/impl/TenantServiceImpl.java | 12 +- .../api/service/impl/UsersServiceImpl.java | 25 +- .../api/utils/RegexUtils.java | 9 + .../api/service/ResourcesServiceTest.java | 3 +- .../api/service/SchedulerServiceTest.java | 3 +- .../api/service/TaskInstanceServiceTest.java | 3 +- .../common/utils/StringUtils.java | 8 +- 16 files changed, 2250 insertions(+), 1832 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index e7d8906f28..bb778dd4eb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -14,68 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.databind.SerializationFeature; -import org.apache.commons.collections.BeanMap; -import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; -import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; -import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; -import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ResourceType; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; -import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DuplicateKeyException; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; +import org.apache.dolphinscheduler.dao.entity.User; import java.io.IOException; -import java.text.MessageFormat; -import java.util.*; -import java.util.regex.Matcher; -import java.util.stream.Collectors; +import java.util.Map; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.springframework.web.multipart.MultipartFile; /** * resources service */ -@Service -public class ResourcesService extends BaseService { - - private static final Logger logger = LoggerFactory.getLogger(ResourcesService.class); - - @Autowired - private ResourceMapper resourcesMapper; - - @Autowired - private UdfFuncMapper udfFunctionMapper; - - @Autowired - private TenantMapper tenantMapper; - - @Autowired - private UserMapper userMapper; - - @Autowired - private ResourceUserMapper resourceUserMapper; - - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; +public interface ResourcesService { /** * create directory @@ -88,74 +43,12 @@ public class ResourcesService extends BaseService { * @param currentDir current directory * @return create directory result */ - @Transactional(rollbackFor = Exception.class) - public Result createDirectory(User loginUser, - String name, - String description, - ResourceType type, - int pid, - String currentDir) { - Result result = new Result(); - // if hdfs not startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); - result = verifyResourceName(fullName,type,loginUser); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - if (pid != -1) { - Resource parentResource = resourcesMapper.selectById(pid); - - if (parentResource == null) { - putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); - return result; - } - - if (!hasPerm(loginUser, parentResource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - } - - - if (checkResourceExists(fullName, 0, type.ordinal())) { - logger.error("resource directory {} has exist, can't recreate", fullName); - putMsg(result, Status.RESOURCE_EXIST); - return result; - } - - Date now = new Date(); - - Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now); - - try { - resourcesMapper.insert(resource); - - putMsg(result, Status.SUCCESS); - Map dataMap = new BeanMap(resource); - Map resultMap = new HashMap(); - for (Map.Entry entry: dataMap.entrySet()) { - if (!"class".equalsIgnoreCase(entry.getKey().toString())) { - resultMap.put(entry.getKey().toString(), entry.getValue()); - } - } - result.setData(resultMap); - } catch (DuplicateKeyException e) { - logger.error("resource directory {} has exist, can't recreate", fullName); - putMsg(result, Status.RESOURCE_EXIST); - return result; - } catch (Exception e) { - logger.error("resource already exists, can't recreate ", e); - throw new RuntimeException("resource already exists, can't recreate"); - } - //create directory in hdfs - createDirecotry(loginUser,fullName,type,result); - return result; - } + Result createDirectory(User loginUser, + String name, + String description, + ResourceType type, + int pid, + String currentDir); /** * create resource @@ -169,121 +62,13 @@ public class ResourcesService extends BaseService { * @param currentDir current directory * @return create result code */ - @Transactional(rollbackFor = Exception.class) - public Result createResource(User loginUser, - String name, - String desc, - ResourceType type, - MultipartFile file, - int pid, - String currentDir) { - Result result = new Result(); - - // if hdfs not startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - if (pid != -1) { - Resource parentResource = resourcesMapper.selectById(pid); - - if (parentResource == null) { - putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); - return result; - } - - if (!hasPerm(loginUser, parentResource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - } - - // file is empty - if (file.isEmpty()) { - logger.error("file is empty: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); - return result; - } - - // file suffix - String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); - String nameSuffix = FileUtils.suffix(name); - - // determine file suffix - if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { - /** - * rename file suffix and original suffix must be consistent - */ - logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); - return result; - } - - //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar - if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) { - logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); - putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); - return result; - } - if (file.getSize() > Constants.MAX_FILE_SIZE) { - logger.error("file size is too large: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); - return result; - } - - // check resoure name exists - String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); - if (checkResourceExists(fullName, 0, type.ordinal())) { - logger.error("resource {} has exist, can't recreate", name); - putMsg(result, Status.RESOURCE_EXIST); - return result; - } - - Date now = new Date(); - Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now); - - try { - resourcesMapper.insert(resource); - - putMsg(result, Status.SUCCESS); - Map dataMap = new BeanMap(resource); - Map resultMap = new HashMap<>(); - for (Map.Entry entry: dataMap.entrySet()) { - if (!"class".equalsIgnoreCase(entry.getKey().toString())) { - resultMap.put(entry.getKey().toString(), entry.getValue()); - } - } - result.setData(resultMap); - } catch (Exception e) { - logger.error("resource already exists, can't recreate ", e); - throw new RuntimeException("resource already exists, can't recreate"); - } - - // fail upload - if (!upload(loginUser, fullName, file, type)) { - logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); - putMsg(result, Status.HDFS_OPERATION_ERROR); - throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); - } - return result; - } - - /** - * check resource is exists - * - * @param fullName fullName - * @param userId user id - * @param type type - * @return true if resource exists - */ - private boolean checkResourceExists(String fullName, int userId, int type ){ - - List resources = resourcesMapper.queryResourceList(fullName, userId, type); - return resources != null && resources.size() > 0; - } - + Result createResource(User loginUser, + String name, + String desc, + ResourceType type, + MultipartFile file, + int pid, + String currentDir); /** * update resource @@ -295,239 +80,12 @@ public class ResourcesService extends BaseService { * @param file resource file * @return update result code */ - @Transactional(rollbackFor = Exception.class) - public Result updateResource(User loginUser, - int resourceId, - String name, - String desc, - ResourceType type, - MultipartFile file) { - Result result = new Result(); - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - Resource resource = resourcesMapper.selectById(resourceId); - if (resource == null) { - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - if (!hasPerm(loginUser, resource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) { - putMsg(result, Status.SUCCESS); - return result; - } - - //check resource aleady exists - String originFullName = resource.getFullName(); - String originResourceName = resource.getAlias(); - - String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name); - if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) { - logger.error("resource {} already exists, can't recreate", name); - putMsg(result, Status.RESOURCE_EXIST); - return result; - } - - if (file != null) { - - // file is empty - if (file.isEmpty()) { - logger.error("file is empty: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); - return result; - } - - // file suffix - String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); - String nameSuffix = FileUtils.suffix(name); - - // determine file suffix - if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { - /** - * rename file suffix and original suffix must be consistent - */ - logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); - return result; - } - - //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar - if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) { - logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); - putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); - return result; - } - if (file.getSize() > Constants.MAX_FILE_SIZE) { - logger.error("file size is too large: {}", file.getOriginalFilename()); - putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); - return result; - } - } - - // query tenant by user id - String tenantCode = getTenantCode(resource.getUserId(),result); - if (StringUtils.isEmpty(tenantCode)){ - return result; - } - // verify whether the resource exists in storage - // get the path of origin file in storage - String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName); - try { - if (!HadoopUtils.getInstance().exists(originHdfsFileName)) { - logger.error("{} not exist", originHdfsFileName); - putMsg(result,Status.RESOURCE_NOT_EXIST); - return result; - } - } catch (IOException e) { - logger.error(e.getMessage(),e); - throw new ServiceException(Status.HDFS_OPERATION_ERROR); - } - - if (!resource.isDirectory()) { - //get the origin file suffix - String originSuffix = FileUtils.suffix(originFullName); - String suffix = FileUtils.suffix(fullName); - boolean suffixIsChanged = false; - if (StringUtils.isBlank(suffix) && StringUtils.isNotBlank(originSuffix)) { - suffixIsChanged = true; - } - if (StringUtils.isNotBlank(suffix) && !suffix.equals(originSuffix)) { - suffixIsChanged = true; - } - //verify whether suffix is changed - if (suffixIsChanged) { - //need verify whether this resource is authorized to other users - Map columnMap = new HashMap<>(); - columnMap.put("resources_id", resourceId); - - List resourcesUsers = resourceUserMapper.selectByMap(columnMap); - if (CollectionUtils.isNotEmpty(resourcesUsers)) { - List userIds = resourcesUsers.stream().map(ResourcesUser::getUserId).collect(Collectors.toList()); - List users = userMapper.selectBatchIds(userIds); - String userNames = users.stream().map(User::getUserName).collect(Collectors.toList()).toString(); - logger.error("resource is authorized to user {},suffix not allowed to be modified", userNames); - putMsg(result,Status.RESOURCE_IS_AUTHORIZED,userNames); - return result; - } - } - } - - // updateResource data - Date now = new Date(); - - resource.setAlias(name); - resource.setFullName(fullName); - resource.setDescription(desc); - resource.setUpdateTime(now); - if (file != null) { - resource.setFileName(file.getOriginalFilename()); - resource.setSize(file.getSize()); - } - - try { - resourcesMapper.updateById(resource); - if (resource.isDirectory()) { - List childrenResource = listAllChildren(resource,false); - if (CollectionUtils.isNotEmpty(childrenResource)) { - String matcherFullName = Matcher.quoteReplacement(fullName); - List childResourceList = new ArrayList<>(); - Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]); - List resourceList = resourcesMapper.listResourceByIds(childResIdArray); - childResourceList = resourceList.stream().map(t -> { - t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - resourcesMapper.batchUpdateResource(childResourceList); - - if (ResourceType.UDF.equals(resource.getType())) { - List udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray); - if (CollectionUtils.isNotEmpty(udfFuncs)) { - udfFuncs = udfFuncs.stream().map(t -> { - t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName)); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); - } - } - } - } else if (ResourceType.UDF.equals(resource.getType())) { - List udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId}); - if (CollectionUtils.isNotEmpty(udfFuncs)) { - udfFuncs = udfFuncs.stream().map(t -> { - t.setResourceName(fullName); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); - } - - } - - putMsg(result, Status.SUCCESS); - Map dataMap = new BeanMap(resource); - Map resultMap = new HashMap<>(5); - for (Map.Entry entry: dataMap.entrySet()) { - if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { - resultMap.put(entry.getKey().toString(), entry.getValue()); - } - } - result.setData(resultMap); - } catch (Exception e) { - logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e); - throw new ServiceException(Status.UPDATE_RESOURCE_ERROR); - } - - // if name unchanged, return directly without moving on HDFS - if (originResourceName.equals(name) && file == null) { - return result; - } - - if (file != null) { - // fail upload - if (!upload(loginUser, fullName, file, type)) { - logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); - putMsg(result, Status.HDFS_OPERATION_ERROR); - throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); - } - if (!fullName.equals(originFullName)) { - try { - HadoopUtils.getInstance().delete(originHdfsFileName,false); - } catch (IOException e) { - logger.error(e.getMessage(),e); - throw new RuntimeException(String.format("delete resource: %s failed.", originFullName)); - } - } - return result; - } - - - // get the path of dest file in hdfs - String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); - - - try { - logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName); - HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true); - } catch (Exception e) { - logger.error(MessageFormat.format("hdfs copy {0} -> {1} fail", originHdfsFileName, destHdfsFileName), e); - putMsg(result,Status.HDFS_COPY_FAIL); - throw new ServiceException(Status.HDFS_COPY_FAIL); - } - - return result; - - } + Result updateResource(User loginUser, + int resourceId, + String name, + String desc, + ResourceType type, + MultipartFile file); /** * query resources list paging @@ -539,99 +97,7 @@ public class ResourcesService extends BaseService { * @param pageSize page size * @return resource list page */ - public Map queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { - - HashMap result = new HashMap<>(5); - Page page = new Page(pageNo, pageSize); - int userId = loginUser.getId(); - if (isAdmin(loginUser)) { - userId= 0; - } - if (direcotryId != -1) { - Resource directory = resourcesMapper.selectById(direcotryId); - if (directory == null) { - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - } - - IPage resourceIPage = resourcesMapper.queryResourcePaging(page, - userId,direcotryId, type.ordinal(), searchVal); - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int)resourceIPage.getTotal()); - pageInfo.setLists(resourceIPage.getRecords()); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result,Status.SUCCESS); - return result; - } - - /** - * create direcoty - * @param loginUser login user - * @param fullName full name - * @param type resource type - * @param result Result - */ - private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){ - // query tenant - String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); - String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); - String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode); - try { - if (!HadoopUtils.getInstance().exists(resourceRootPath)) { - createTenantDirIfNotExists(tenantCode); - } - - if (!HadoopUtils.getInstance().mkdir(directoryName)) { - logger.error("create resource directory {} of hdfs failed",directoryName); - putMsg(result,Status.HDFS_OPERATION_ERROR); - throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName)); - } - } catch (Exception e) { - logger.error("create resource directory {} of hdfs failed",directoryName); - putMsg(result,Status.HDFS_OPERATION_ERROR); - throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName)); - } - } - - /** - * upload file to hdfs - * - * @param loginUser login user - * @param fullName full name - * @param file file - */ - private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) { - // save to local - String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); - String nameSuffix = FileUtils.suffix(fullName); - - // determine file suffix - if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { - return false; - } - // query tenant - String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); - // random file name - String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString()); - - - // save file to hdfs, and delete original file - String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); - String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode); - try { - // if tenant dir not exists - if (!HadoopUtils.getInstance().exists(resourcePath)) { - createTenantDirIfNotExists(tenantCode); - } - org.apache.dolphinscheduler.api.utils.FileUtils.copyFile(file, localFilename); - HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); - } catch (Exception e) { - logger.error(e.getMessage(), e); - return false; - } - return true; - } + Map queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize); /** * query resource list @@ -640,21 +106,7 @@ public class ResourcesService extends BaseService { * @param type resource type * @return resource list */ - public Map queryResourceList(User loginUser, ResourceType type) { - - Map result = new HashMap<>(5); - - int userId = loginUser.getId(); - if(isAdmin(loginUser)){ - userId = 0; - } - List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0); - Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList); - result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); - putMsg(result,Status.SUCCESS); - - return result; - } + Map queryResourceList(User loginUser, ResourceType type); /** * query resource list by program type @@ -663,33 +115,7 @@ public class ResourcesService extends BaseService { * @param type resource type * @return resource list */ - public Map queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) { - - Map result = new HashMap<>(5); - String suffix = ".jar"; - int userId = loginUser.getId(); - if(isAdmin(loginUser)){ - userId = 0; - } - if (programType != null) { - switch (programType) { - case JAVA: - break; - case SCALA: - break; - case PYTHON: - suffix = ".py"; - break; - } - } - List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0); - List resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter(); - Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources); - result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); - putMsg(result,Status.SUCCESS); - - return result; - } + Map queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType); /** * delete resource @@ -697,82 +123,9 @@ public class ResourcesService extends BaseService { * @param loginUser login user * @param resourceId resource id * @return delete result code - * @throws Exception exception + * @throws IOException exception */ - @Transactional(rollbackFor = Exception.class) - public Result delete(User loginUser, int resourceId) throws Exception { - Result result = new Result(); - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - //get resource and hdfs path - Resource resource = resourcesMapper.selectById(resourceId); - if (resource == null) { - logger.error("resource file not exist, resource id {}", resourceId); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - if (!hasPerm(loginUser, resource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - String tenantCode = getTenantCode(resource.getUserId(),result); - if (StringUtils.isEmpty(tenantCode)){ - return result; - } - - // get all resource id of process definitions those is released - List> list = processDefinitionMapper.listResources(); - Map> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list); - Set resourceIdSet = resourceProcessMap.keySet(); - // get all children of the resource - List allChildren = listAllChildren(resource,true); - Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]); - - //if resource type is UDF,need check whether it is bound by UDF functon - if (resource.getType() == (ResourceType.UDF)) { - List udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray); - if (CollectionUtils.isNotEmpty(udfFuncs)) { - logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString()); - putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName()); - return result; - } - } - - if (resourceIdSet.contains(resource.getPid())) { - logger.error("can't be deleted,because it is used of process definition"); - putMsg(result, Status.RESOURCE_IS_USED); - return result; - } - resourceIdSet.retainAll(allChildren); - if (CollectionUtils.isNotEmpty(resourceIdSet)) { - logger.error("can't be deleted,because it is used of process definition"); - for (Integer resId : resourceIdSet) { - logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId)); - } - putMsg(result, Status.RESOURCE_IS_USED); - return result; - } - - // get hdfs file by type - String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); - - //delete data in database - resourcesMapper.deleteIds(needDeleteResourceIdArray); - resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray); - - //delete file on hdfs - HadoopUtils.getInstance().delete(hdfsFilename, true); - putMsg(result, Status.SUCCESS); - - return result; - } + Result delete(User loginUser, int resourceId) throws IOException; /** * verify resource by name and type @@ -781,37 +134,7 @@ public class ResourcesService extends BaseService { * @param type resource type * @return true if the resource name not exists, otherwise return false */ - public Result verifyResourceName(String fullName, ResourceType type,User loginUser) { - Result result = new Result(); - putMsg(result, Status.SUCCESS); - if (checkResourceExists(fullName, 0, type.ordinal())) { - logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName); - putMsg(result, Status.RESOURCE_EXIST); - } else { - // query tenant - Tenant tenant = tenantMapper.queryById(loginUser.getTenantId()); - if(tenant != null){ - String tenantCode = tenant.getTenantCode(); - - try { - String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); - if(HadoopUtils.getInstance().exists(hdfsFilename)){ - logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename); - putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename); - } - - } catch (Exception e) { - logger.error(e.getMessage(),e); - putMsg(result,Status.HDFS_OPERATION_ERROR); - } - }else{ - putMsg(result,Status.TENANT_NOT_EXIST); - } - } - - - return result; - } + Result verifyResourceName(String fullName, ResourceType type,User loginUser); /** * verify resource by full name or pid and type @@ -820,40 +143,7 @@ public class ResourcesService extends BaseService { * @param type resource type * @return true if the resource full name or pid not exists, otherwise return false */ - public Result queryResource(String fullName,Integer id,ResourceType type) { - Result result = new Result(); - if (StringUtils.isBlank(fullName) && id == null) { - logger.error("You must input one of fullName and pid"); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR); - return result; - } - if (StringUtils.isNotBlank(fullName)) { - List resourceList = resourcesMapper.queryResource(fullName,type.ordinal()); - if (CollectionUtils.isEmpty(resourceList)) { - logger.error("resource file not exist, resource full name {} ", fullName); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - putMsg(result, Status.SUCCESS); - result.setData(resourceList.get(0)); - } else { - Resource resource = resourcesMapper.selectById(id); - if (resource == null) { - logger.error("resource file not exist, resource id {}", id); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - Resource parentResource = resourcesMapper.selectById(resource.getPid()); - if (parentResource == null) { - logger.error("parent resource file not exist, resource id {}", id); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - putMsg(result, Status.SUCCESS); - result.setData(parentResource); - } - return result; - } + Result queryResource(String fullName,Integer id,ResourceType type); /** * view resource file online @@ -863,64 +153,7 @@ public class ResourcesService extends BaseService { * @param limit limit * @return resource content */ - public Result readResource(int resourceId, int skipLineNum, int limit) { - Result result = new Result(); - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - // get resource by id - Resource resource = resourcesMapper.selectById(resourceId); - if (resource == null) { - logger.error("resource file not exist, resource id {}", resourceId); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - //check preview or not by file suffix - String nameSuffix = FileUtils.suffix(resource.getAlias()); - String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); - if (StringUtils.isNotEmpty(resourceViewSuffixs)) { - List strList = Arrays.asList(resourceViewSuffixs.split(",")); - if (!strList.contains(nameSuffix)) { - logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId); - putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); - return result; - } - } - - String tenantCode = getTenantCode(resource.getUserId(),result); - if (StringUtils.isEmpty(tenantCode)){ - return result; - } - - // hdfs path - String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName()); - logger.info("resource hdfs path is {} ", hdfsFileName); - try { - if(HadoopUtils.getInstance().exists(hdfsFileName)){ - List content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit); - - putMsg(result, Status.SUCCESS); - Map map = new HashMap<>(); - map.put(ALIAS, resource.getAlias()); - map.put(CONTENT, String.join("\n", content)); - result.setData(map); - }else{ - logger.error("read file {} not exist in hdfs", hdfsFileName); - putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName); - } - - } catch (Exception e) { - logger.error("Resource {} read failed", hdfsFileName, e); - putMsg(result, Status.HDFS_OPERATION_ERROR); - } - - return result; - } + Result readResource(int resourceId, int skipLineNum, int limit); /** * create resource file online @@ -933,73 +166,7 @@ public class ResourcesService extends BaseService { * @param content content * @return create result code */ - @Transactional(rollbackFor = Exception.class) - public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) { - Result result = new Result(); - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - //check file suffix - String nameSuffix = fileSuffix.trim(); - String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); - if (StringUtils.isNotEmpty(resourceViewSuffixs)) { - List strList = Arrays.asList(resourceViewSuffixs.split(",")); - if (!strList.contains(nameSuffix)) { - logger.error("resouce suffix {} not support create", nameSuffix); - putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); - return result; - } - } - - String name = fileName.trim() + "." + nameSuffix; - String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name); - - result = verifyResourceName(fullName,type,loginUser); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - if (pid != -1) { - Resource parentResource = resourcesMapper.selectById(pid); - - if (parentResource == null) { - putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); - return result; - } - - if (!hasPerm(loginUser, parentResource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - } - - // save data - Date now = new Date(); - Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now); - - resourcesMapper.insert(resource); - - putMsg(result, Status.SUCCESS); - Map dataMap = new BeanMap(resource); - Map resultMap = new HashMap<>(); - for (Map.Entry entry: dataMap.entrySet()) { - if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { - resultMap.put(entry.getKey().toString(), entry.getValue()); - } - } - result.setData(resultMap); - - String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); - - result = uploadContentToHdfs(fullName, tenantCode, content); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - throw new RuntimeException(result.getMsg()); - } - return result; - } + Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory); /** * updateProcessInstance resource @@ -1008,145 +175,16 @@ public class ResourcesService extends BaseService { * @param content content * @return update result cod */ - @Transactional(rollbackFor = Exception.class) - public Result updateResourceContent(int resourceId, String content) { - Result result = new Result(); - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - Resource resource = resourcesMapper.selectById(resourceId); - if (resource == null) { - logger.error("read file not exist, resource id {}", resourceId); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - //check can edit by file suffix - String nameSuffix = FileUtils.suffix(resource.getAlias()); - String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); - if (StringUtils.isNotEmpty(resourceViewSuffixs)) { - List strList = Arrays.asList(resourceViewSuffixs.split(",")); - if (!strList.contains(nameSuffix)) { - logger.error("resource suffix {} not support updateProcessInstance, resource id {}", nameSuffix, resourceId); - putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); - return result; - } - } - - String tenantCode = getTenantCode(resource.getUserId(),result); - if (StringUtils.isEmpty(tenantCode)){ - return result; - } - resource.setSize(content.getBytes().length); - resource.setUpdateTime(new Date()); - resourcesMapper.updateById(resource); - - - result = uploadContentToHdfs(resource.getFullName(), tenantCode, content); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - throw new RuntimeException(result.getMsg()); - } - return result; - } - - /** - * @param resourceName resource name - * @param tenantCode tenant code - * @param content content - * @return result - */ - private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) { - Result result = new Result(); - String localFilename = ""; - String hdfsFileName = ""; - try { - localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString()); - - if (!FileUtils.writeContent2File(content, localFilename)) { - // write file fail - logger.error("file {} fail, content is {}", localFilename, content); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - - // get resource file hdfs path - hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName); - String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); - logger.info("resource hdfs path is {} ", hdfsFileName); - - HadoopUtils hadoopUtils = HadoopUtils.getInstance(); - if (!hadoopUtils.exists(resourcePath)) { - // create if tenant dir not exists - createTenantDirIfNotExists(tenantCode); - } - if (hadoopUtils.exists(hdfsFileName)) { - hadoopUtils.delete(hdfsFileName, false); - } - - hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true); - } catch (Exception e) { - logger.error(e.getMessage(), e); - result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); - result.setMsg(String.format("copy %s to hdfs %s fail", localFilename, hdfsFileName)); - return result; - } - putMsg(result, Status.SUCCESS); - return result; - } - + Result updateResourceContent(int resourceId, String content); /** * download file * * @param resourceId resource id * @return resource content - * @throws Exception exception + * @throws IOException exception */ - public org.springframework.core.io.Resource downloadResource(int resourceId) throws Exception { - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - throw new RuntimeException("hdfs not startup"); - } - - Resource resource = resourcesMapper.selectById(resourceId); - if (resource == null) { - logger.error("download file not exist, resource id {}", resourceId); - return null; - } - if (resource.isDirectory()) { - logger.error("resource id {} is directory,can't download it", resourceId); - throw new RuntimeException("cant't download directory"); - } - - int userId = resource.getUserId(); - User user = userMapper.selectById(userId); - if(user == null){ - logger.error("user id {} not exists", userId); - throw new RuntimeException(String.format("resource owner id %d not exist",userId)); - } - - Tenant tenant = tenantMapper.queryById(user.getTenantId()); - if(tenant == null){ - logger.error("tenant id {} not exists", user.getTenantId()); - throw new RuntimeException(String.format("The tenant id %d of resource owner not exist",user.getTenantId())); - } - - String tenantCode = tenant.getTenantCode(); - - String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); - - String localFileName = FileUtils.getDownloadFilename(resource.getAlias()); - logger.info("resource hdfs path is {} ", hdfsFileName); - - HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true); - return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName); - } - + org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException; /** * list all file @@ -1155,25 +193,7 @@ public class ResourcesService extends BaseService { * @param userId user id * @return unauthorized result code */ - public Map authorizeResourceTree(User loginUser, Integer userId) { - - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - List resourceList = resourcesMapper.queryResourceExceptUserId(userId); - List list; - if (CollectionUtils.isNotEmpty(resourceList)) { - Visitor visitor = new ResourceTreeVisitor(resourceList); - list = visitor.visit().getChildren(); - } else { - list = new ArrayList<>(0); - } - - result.put(Constants.DATA_LIST, list); - putMsg(result, Status.SUCCESS); - return result; - } + Map authorizeResourceTree(User loginUser, Integer userId); /** * unauthorized file @@ -1182,28 +202,7 @@ public class ResourcesService extends BaseService { * @param userId user id * @return unauthorized result code */ - public Map unauthorizedFile(User loginUser, Integer userId) { - - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - List resourceList = resourcesMapper.queryResourceExceptUserId(userId); - List list; - if (resourceList != null && resourceList.size() > 0) { - Set resourceSet = new HashSet<>(resourceList); - List authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId); - - getAuthorizedResourceList(resourceSet, authedResourceList); - list = new ArrayList<>(resourceSet); - } else { - list = new ArrayList<>(0); - } - Visitor visitor = new ResourceTreeVisitor(list); - result.put(Constants.DATA_LIST, visitor.visit().getChildren()); - putMsg(result, Status.SUCCESS); - return result; - } + Map unauthorizedFile(User loginUser, Integer userId); /** * unauthorized udf function @@ -1212,29 +211,7 @@ public class ResourcesService extends BaseService { * @param userId user id * @return unauthorized result code */ - public Map unauthorizedUDFFunction(User loginUser, Integer userId) { - Map result = new HashMap<>(5); - //only admin can operate - if (isNotAdmin(loginUser, result)) { - return result; - } - - List udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId); - List resultList = new ArrayList<>(); - Set udfFuncSet = null; - if (CollectionUtils.isNotEmpty(udfFuncList)) { - udfFuncSet = new HashSet<>(udfFuncList); - - List authedUDFFuncList = udfFunctionMapper.queryAuthedUdfFunc(userId); - - getAuthorizedResourceList(udfFuncSet, authedUDFFuncList); - resultList = new ArrayList<>(udfFuncSet); - } - result.put(Constants.DATA_LIST, resultList); - putMsg(result, Status.SUCCESS); - return result; - } - + Map unauthorizedUDFFunction(User loginUser, Integer userId); /** * authorized udf function @@ -1243,17 +220,7 @@ public class ResourcesService extends BaseService { * @param userId user id * @return authorized result code */ - public Map authorizedUDFFunction(User loginUser, Integer userId) { - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - List udfFuncs = udfFunctionMapper.queryAuthedUdfFunc(userId); - result.put(Constants.DATA_LIST, udfFuncs); - putMsg(result, Status.SUCCESS); - return result; - } - + Map authorizedUDFFunction(User loginUser, Integer userId); /** * authorized file @@ -1262,91 +229,6 @@ public class ResourcesService extends BaseService { * @param userId user id * @return authorized result */ - public Map authorizedFile(User loginUser, Integer userId) { - Map result = new HashMap<>(5); - if (isNotAdmin(loginUser, result)) { - return result; - } - List authedResources = resourcesMapper.queryAuthorizedResourceList(userId); - Visitor visitor = new ResourceTreeVisitor(authedResources); - String visit = JSONUtils.toJsonString(visitor.visit(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - logger.info(visit); - String jsonTreeStr = JSONUtils.toJsonString(visitor.visit().getChildren(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - logger.info(jsonTreeStr); - result.put(Constants.DATA_LIST, visitor.visit().getChildren()); - putMsg(result,Status.SUCCESS); - return result; - } - - /** - * get authorized resource list - * - * @param resourceSet resource set - * @param authedResourceList authorized resource list - */ - private void getAuthorizedResourceList(Set resourceSet, List authedResourceList) { - Set authedResourceSet = null; - if (CollectionUtils.isNotEmpty(authedResourceList)) { - authedResourceSet = new HashSet<>(authedResourceList); - resourceSet.removeAll(authedResourceSet); - } - } - - /** - * get tenantCode by UserId - * - * @param userId user id - * @param result return result - * @return - */ - private String getTenantCode(int userId,Result result){ - - User user = userMapper.selectById(userId); - if (user == null) { - logger.error("user {} not exists", userId); - putMsg(result, Status.USER_NOT_EXIST,userId); - return null; - } - - Tenant tenant = tenantMapper.queryById(user.getTenantId()); - if (tenant == null){ - logger.error("tenant not exists"); - putMsg(result, Status.TENANT_NOT_EXIST); - return null; - } - return tenant.getTenantCode(); - } - - /** - * list all children id - * @param resource resource - * @param containSelf whether add self to children list - * @return all children id - */ - List listAllChildren(Resource resource,boolean containSelf){ - List childList = new ArrayList<>(); - if (resource.getId() != -1 && containSelf) { - childList.add(resource.getId()); - } - - if(resource.isDirectory()){ - listAllChildren(resource.getId(),childList); - } - return childList; - } - - /** - * list all children id - * @param resourceId resource id - * @param childList child list - */ - void listAllChildren(int resourceId,List childList){ - - List children = resourcesMapper.listChildren(resourceId); - for(int chlidId:children){ - childList.add(chlidId); - listAllChildren(chlidId,childList); - } - } + Map authorizedFile(User loginUser, Integer userId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index 55880ad63c..18f3ebf55f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -17,77 +17,18 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.dto.ScheduleParam; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; -import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; -import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.quartz.CronExpression; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; - /** * scheduler service */ -@Service -public class SchedulerService extends BaseService { - - private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class); - - @Autowired - private ProjectService projectService; - - @Autowired - private ExecutorService executorService; - - @Autowired - private MonitorService monitorService; - - @Autowired - private ProcessService processService; - - @Autowired - private ScheduleMapper scheduleMapper; - - @Autowired - private ProjectMapper projectMapper; - - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; +public interface SchedulerService { /** * save schedule @@ -103,80 +44,14 @@ public class SchedulerService extends BaseService { * @param workerGroup worker group * @return create result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map insertSchedule(User loginUser, String projectName, - Integer processDefineId, - String schedule, - WarningType warningType, - int warningGroupId, - FailureStrategy failureStrategy, - Priority processInstancePriority, - String workerGroup) { - - Map result = new HashMap(); - - Project project = projectMapper.queryByName(projectName); - - // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); - if (!hasProjectAndPerm) { - return result; - } - - // check work flow define release state - ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId); - result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - - Schedule scheduleObj = new Schedule(); - Date now = new Date(); - - scheduleObj.setProjectName(projectName); - scheduleObj.setProcessDefinitionId(processDefinition.getId()); - scheduleObj.setProcessDefinitionName(processDefinition.getName()); - - ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); - if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { - logger.warn("The start time must not be the same as the end"); - putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); - return result; - } - scheduleObj.setStartTime(scheduleParam.getStartTime()); - scheduleObj.setEndTime(scheduleParam.getEndTime()); - if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { - logger.error(scheduleParam.getCrontab() + " verify failure"); - - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab()); - return result; - } - scheduleObj.setCrontab(scheduleParam.getCrontab()); - scheduleObj.setWarningType(warningType); - scheduleObj.setWarningGroupId(warningGroupId); - scheduleObj.setFailureStrategy(failureStrategy); - scheduleObj.setCreateTime(now); - scheduleObj.setUpdateTime(now); - scheduleObj.setUserId(loginUser.getId()); - scheduleObj.setUserName(loginUser.getUserName()); - scheduleObj.setReleaseState(ReleaseState.OFFLINE); - scheduleObj.setProcessInstancePriority(processInstancePriority); - scheduleObj.setWorkerGroup(workerGroup); - scheduleMapper.insert(scheduleObj); - - /** - * updateProcessInstance receivers and cc by process definition id - */ - processDefinition.setWarningGroupId(warningGroupId); - processDefinitionMapper.updateById(processDefinition); - - // return scheduler object with ID - result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId())); - putMsg(result, Status.SUCCESS); - - result.put("scheduleId", scheduleObj.getId()); - return result; - } + Map insertSchedule(User loginUser, String projectName, + Integer processDefineId, + String schedule, + WarningType warningType, + int warningGroupId, + FailureStrategy failureStrategy, + Priority processInstancePriority, + String workerGroup); /** * updateProcessInstance schedule @@ -193,95 +68,16 @@ public class SchedulerService extends BaseService { * @param scheduleStatus schedule status * @return update result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map updateSchedule(User loginUser, - String projectName, - Integer id, - String scheduleExpression, - WarningType warningType, - int warningGroupId, - FailureStrategy failureStrategy, - ReleaseState scheduleStatus, - Priority processInstancePriority, - String workerGroup) { - Map result = new HashMap(5); - - Project project = projectMapper.queryByName(projectName); - - // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); - if (!hasProjectAndPerm) { - return result; - } - - // check schedule exists - Schedule schedule = scheduleMapper.selectById(id); - - if (schedule == null) { - putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); - return result; - } - - ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId()); - return result; - } - - /** - * scheduling on-line status forbid modification - */ - if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { - return result; - } - - Date now = new Date(); - - // updateProcessInstance param - if (StringUtils.isNotEmpty(scheduleExpression)) { - ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class); - if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { - logger.warn("The start time must not be the same as the end"); - putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); - return result; - } - schedule.setStartTime(scheduleParam.getStartTime()); - schedule.setEndTime(scheduleParam.getEndTime()); - if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { - putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab()); - return result; - } - schedule.setCrontab(scheduleParam.getCrontab()); - } - - if (warningType != null) { - schedule.setWarningType(warningType); - } - - schedule.setWarningGroupId(warningGroupId); - - if (failureStrategy != null) { - schedule.setFailureStrategy(failureStrategy); - } - - if (scheduleStatus != null) { - schedule.setReleaseState(scheduleStatus); - } - schedule.setWorkerGroup(workerGroup); - schedule.setUpdateTime(now); - schedule.setProcessInstancePriority(processInstancePriority); - scheduleMapper.updateById(schedule); - - /** - * updateProcessInstance recipients and cc by process definition ID - */ - processDefinition.setWarningGroupId(warningGroupId); - - processDefinitionMapper.updateById(processDefinition); - - putMsg(result, Status.SUCCESS); - return result; - } + Map updateSchedule(User loginUser, + String projectName, + Integer id, + String scheduleExpression, + WarningType warningType, + int warningGroupId, + FailureStrategy failureStrategy, + ReleaseState scheduleStatus, + Priority processInstancePriority, + String workerGroup); /** @@ -293,110 +89,10 @@ public class SchedulerService extends BaseService { * @param scheduleStatus schedule status * @return publish result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map setScheduleState(User loginUser, - String projectName, - Integer id, - ReleaseState scheduleStatus) { - - Map result = new HashMap(5); - - Project project = projectMapper.queryByName(projectName); - // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); - if (!hasProjectAndPerm) { - return result; - } - - // check schedule exists - Schedule scheduleObj = scheduleMapper.selectById(id); - - if (scheduleObj == null) { - putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); - return result; - } - // check schedule release state - if (scheduleObj.getReleaseState() == scheduleStatus) { - logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}", - scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); - putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); - return result; - } - ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId()); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId()); - return result; - } - - if (scheduleStatus == ReleaseState.ONLINE) { - // check process definition release state - if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - logger.info("not release process definition id: {} , name : {}", - processDefinition.getId(), processDefinition.getName()); - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); - return result; - } - // check sub process definition release state - List subProcessDefineIds = new ArrayList<>(); - processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds); - Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]); - if (subProcessDefineIds.size() > 0) { - List subProcessDefinitionList = - processDefinitionMapper.queryDefinitionListByIdList(idArray); - if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) { - for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { - /** - * if there is no online process, exit directly - */ - if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { - logger.info("not release process definition id: {} , name : {}", - subProcessDefinition.getId(), subProcessDefinition.getName()); - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId()); - return result; - } - } - } - } - } - - // check master server exists - List masterServers = monitorService.getServerListFromZK(true); - - if (masterServers.size() == 0) { - putMsg(result, Status.MASTER_NOT_EXISTS); - return result; - } - - // set status - scheduleObj.setReleaseState(scheduleStatus); - - scheduleMapper.updateById(scheduleObj); - - try { - switch (scheduleStatus) { - case ONLINE: { - logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); - setSchedule(project.getId(), scheduleObj); - break; - } - case OFFLINE: { - logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); - deleteSchedule(project.getId(), id); - break; - } - default: { - putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); - return result; - } - } - } catch (Exception e) { - result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); - throw new ServiceException(result.get(Constants.MSG).toString()); - } - - putMsg(result, Status.SUCCESS); - return result; - } + Map setScheduleState(User loginUser, + String projectName, + Integer id, + ReleaseState scheduleStatus); /** * query schedule @@ -409,36 +105,7 @@ public class SchedulerService extends BaseService { * @param searchVal search value * @return schedule list page */ - public Map querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) { - - HashMap result = new HashMap<>(); - - Project project = projectMapper.queryByName(projectName); - - // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); - if (!hasProjectAndPerm) { - return result; - } - - ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId); - return result; - } - Page page = new Page(pageNo, pageSize); - IPage scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging( - page, processDefineId, searchVal - ); - - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int) scheduleIPage.getTotal()); - pageInfo.setLists(scheduleIPage.getRecords()); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } + Map querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize); /** * query schedule list @@ -447,41 +114,7 @@ public class SchedulerService extends BaseService { * @param projectName project name * @return schedule list */ - public Map queryScheduleList(User loginUser, String projectName) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); - if (!hasProjectAndPerm) { - return result; - } - - List schedules = scheduleMapper.querySchedulerListByProjectName(projectName); - - result.put(Constants.DATA_LIST, schedules); - putMsg(result, Status.SUCCESS); - - return result; - } - - public void setSchedule(int projectId, Schedule schedule) { - - int scheduleId = schedule.getId(); - logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId); - - Date startDate = schedule.getStartTime(); - Date endDate = schedule.getEndTime(); - - String jobName = QuartzExecutors.buildJobName(scheduleId); - String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); - - Map dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule); - - QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate, - schedule.getCrontab(), dataMap); - - } + Map queryScheduleList(User loginUser, String projectName); /** * delete schedule @@ -490,35 +123,7 @@ public class SchedulerService extends BaseService { * @param scheduleId schedule id * @throws RuntimeException runtime exception */ - public static void deleteSchedule(int projectId, int scheduleId) { - logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId); - - String jobName = QuartzExecutors.buildJobName(scheduleId); - String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); - - if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) { - logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId); - throw new ServiceException("set offline failure"); - } - - } - - /** - * check valid - * - * @param result result - * @param bool bool - * @param status status - * @return check result code - */ - private boolean checkValid(Map result, boolean bool, Status status) { - // timeout is valid - if (bool) { - putMsg(result, status); - return true; - } - return false; - } + void deleteSchedule(int projectId, int scheduleId); /** * delete schedule by id @@ -528,46 +133,7 @@ public class SchedulerService extends BaseService { * @param scheduleId scheule id * @return delete result code */ - public Map deleteScheduleById(User loginUser, String projectName, Integer scheduleId) { - - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - Schedule schedule = scheduleMapper.selectById(scheduleId); - - if (schedule == null) { - putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId); - return result; - } - - // Determine if the login user is the owner of the schedule - if (loginUser.getId() != schedule.getUserId() - && loginUser.getUserType() != UserType.ADMIN_USER) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - // check schedule is already online - if (schedule.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); - return result; - } - - int delete = scheduleMapper.deleteById(scheduleId); - - if (delete > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); - } - return result; - } + Map deleteScheduleById(User loginUser, String projectName, Integer scheduleId); /** * preview schedule @@ -577,24 +143,5 @@ public class SchedulerService extends BaseService { * @param schedule schedule expression * @return the next five fire time */ - public Map previewSchedule(User loginUser, String projectName, String schedule) { - Map result = new HashMap<>(); - CronExpression cronExpression; - ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); - Date now = new Date(); - - Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime(); - Date endTime = scheduleParam.getEndTime(); - try { - cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab()); - } catch (ParseException e) { - logger.error(e.getMessage(), e); - putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR); - return result; - } - List selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT); - result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t))); - putMsg(result, Status.SUCCESS); - return result; - } + Map previewSchedule(User loginUser, String projectName, String schedule); } \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 6c68202313..cbbc89bde0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -17,57 +17,15 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.service.process.ProcessService; -import java.text.MessageFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * task instance service */ -@Service -public class TaskInstanceService extends BaseService { - - @Autowired - ProjectMapper projectMapper; - - @Autowired - ProjectService projectService; - - @Autowired - ProcessService processService; - - @Autowired - TaskInstanceMapper taskInstanceMapper; - - @Autowired - ProcessInstanceService processInstanceService; - - @Autowired - UsersService usersService; +public interface TaskInstanceService { /** * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging @@ -85,65 +43,10 @@ public class TaskInstanceService extends BaseService { * @param pageSize page size * @return task list page */ - public Map queryTaskListPaging(User loginUser, String projectName, - Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate, - String endDate, String searchVal, ExecutionStatus stateType, String host, - Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - return checkResult; - } - - int[] statusArray = null; - if (stateType != null) { - statusArray = new int[]{stateType.ordinal()}; - } - - Date start = null; - Date end = null; - if (StringUtils.isNotEmpty(startDate)) { - start = DateUtils.getScheduleDate(startDate); - if (start == null) { - return generateInvalidParamRes(result, "startDate"); - } - } - if (StringUtils.isNotEmpty(endDate)) { - end = DateUtils.getScheduleDate(endDate); - if (end == null) { - return generateInvalidParamRes(result, "endDate"); - } - } - - Page page = new Page(pageNo, pageSize); - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - int executorId = usersService.getUserIdByName(executorName); - - IPage taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( - page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end - ); - Set exclusionSet = new HashSet<>(); - exclusionSet.add(Constants.CLASS); - exclusionSet.add("taskJson"); - List taskInstanceList = taskInstanceIPage.getRecords(); - - for (TaskInstance taskInstance : taskInstanceList) { - taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime())); - User executor = usersService.queryUser(taskInstance.getExecutorId()); - if (null != executor) { - taskInstance.setExecutorName(executor.getUserName()); - } - } - pageInfo.setTotalCount((int) taskInstanceIPage.getTotal()); - pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet)); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryTaskListPaging(User loginUser, String projectName, + Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate, + String endDate, String searchVal, ExecutionStatus stateType, String host, + Integer pageNo, Integer pageSize); /** * change one task instance's state from failure to forced success @@ -153,51 +56,6 @@ public class TaskInstanceService extends BaseService { * @param taskInstanceId task instance id * @return the result code and msg */ - public Map forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - // check user auth - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - return checkResult; - } + Map forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId); - // check whether the task instance can be found - TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); - if (task == null) { - putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); - return result; - } - - // check whether the task instance state type is failure - if (!task.getState().typeIsFailure()) { - putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); - return result; - } - - // change the state of the task instance - task.setState(ExecutionStatus.FORCED_SUCCESS); - int changedNum = taskInstanceMapper.updateById(task); - if (changedNum > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); - } - - return result; - } - - /*** - * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name - * @param result exist result map - * @param params invalid params name - * @return update result map - */ - private Map generateInvalidParamRes(Map result, String params) { - result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); - result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params)); - return result; - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java index da85621041..ba0d32e4ab 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java @@ -61,7 +61,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @return token list for page number and page size */ public Map queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); @@ -87,7 +87,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @return create result code */ public Map createToken(User loginUser, int userId, String expireTime, String token) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (!hasPerm(loginUser,userId)){ putMsg(result, Status.USER_NO_OPERATION_PERM); @@ -124,7 +124,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @return token string */ public Map generateToken(User loginUser, int userId, String expireTime) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (!hasPerm(loginUser,userId)){ putMsg(result, Status.USER_NO_OPERATION_PERM); return result; @@ -143,7 +143,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @return delete result code */ public Map delAccessTokenById(User loginUser, int id) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); AccessToken accessToken = accessTokenMapper.selectById(id); @@ -174,7 +174,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe * @return update result code */ public Map updateToken(User loginUser, int id, int userId, String expireTime, String token) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (!hasPerm(loginUser,userId)){ putMsg(result, Status.USER_NO_OPERATION_PERM); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java index 7254fc1a88..b84c2795e8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java @@ -130,7 +130,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis private Map countStateByProject(User loginUser, int projectId, String startDate, String endDate , TriFunction> instanceStateCounter) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); boolean checkProject = checkProject(loginUser, projectId, result); if (!checkProject) { return result; @@ -193,7 +193,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis */ public Map countCommandState(User loginUser, int projectId, String startDate, String endDate) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); boolean checkProject = checkProject(loginUser, projectId, result); if (!checkProject) { return result; @@ -264,7 +264,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis * @return queue state count data */ public Map countQueueState(User loginUser, int projectId) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); boolean checkProject = checkProject(loginUser, projectId, result); if (!checkProject) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 2a2ae78618..3a92bef6c5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -146,6 +146,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProcessService processService; + @Autowired + private SchedulerService schedulerService; + /** * create process definition * @@ -273,7 +276,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Override public Map queryProcessDefinitionList(User loginUser, String projectName) { - HashMap result = new HashMap<>(5); + HashMap result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -399,7 +402,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements String desc, String locations, String connects) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -514,7 +517,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Transactional(rollbackFor = RuntimeException.class) public Map deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); @@ -634,7 +637,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements // set status schedule.setReleaseState(ReleaseState.OFFLINE); scheduleMapper.updateById(schedule); - SchedulerService.deleteSchedule(project.getId(), schedule.getId()); + schedulerService.deleteSchedule(project.getId(), schedule.getId()); } break; default: @@ -823,7 +826,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Override @Transactional(rollbackFor = RuntimeException.class) public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); String processMetaJson = FileUtils.file2String(file); List processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); @@ -992,7 +995,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } //recursive sub-process parameter correction map key for old process id value for new process id - Map subProcessIdMap = new HashMap<>(20); + Map subProcessIdMap = new HashMap<>(); List subProcessList = StreamUtils.asStream(jsonArray.elements()) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) @@ -1283,7 +1286,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Override public Map queryProcessDefinitionAllByProjectId(Integer projectId) { - HashMap result = new HashMap<>(5); + HashMap result = new HashMap<>(); List resourceList = processDefineMapper.queryAllDefinitionList(projectId); result.put(Constants.DATA_LIST, resourceList); @@ -1494,7 +1497,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements Integer processId, Project targetProject) throws JsonProcessingException { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.selectById(processId); if (processDefinition == null) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java new file mode 100644 index 0000000000..bcc37cb606 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -0,0 +1,1313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.ALIAS; +import static org.apache.dolphinscheduler.common.Constants.CONTENT; +import static org.apache.dolphinscheduler.common.Constants.JAR; + +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; +import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; +import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; +import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ResourcesService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.RegexUtils; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.ResourcesUser; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; + +import org.apache.commons.beanutils.BeanMap; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.SerializationFeature; + +/** + * resources service impl + */ +@Service +public class ResourcesServiceImpl extends BaseService implements ResourcesService { + + private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceImpl.class); + + @Autowired + private ResourceMapper resourcesMapper; + + @Autowired + private UdfFuncMapper udfFunctionMapper; + + @Autowired + private TenantMapper tenantMapper; + + @Autowired + private UserMapper userMapper; + + @Autowired + private ResourceUserMapper resourceUserMapper; + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + /** + * create directory + * + * @param loginUser login user + * @param name alias + * @param description description + * @param type type + * @param pid parent id + * @param currentDir current directory + * @return create directory result + */ + @Transactional(rollbackFor = Exception.class) + public Result createDirectory(User loginUser, + String name, + String description, + ResourceType type, + int pid, + String currentDir) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name); + result = verifyResource(loginUser, type, fullName, pid); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + if (checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource directory {} has exist, can't recreate", fullName); + putMsg(result, Status.RESOURCE_EXIST); + return result; + } + + Date now = new Date(); + + Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now); + + try { + resourcesMapper.insert(resource); + putMsg(result, Status.SUCCESS); + Map dataMap = new BeanMap(resource); + Map resultMap = new HashMap<>(); + for (Map.Entry entry: dataMap.entrySet()) { + if (!"class".equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + } catch (DuplicateKeyException e) { + logger.error("resource directory {} has exist, can't recreate", fullName); + putMsg(result, Status.RESOURCE_EXIST); + return result; + } catch (Exception e) { + logger.error("resource already exists, can't recreate ", e); + throw new ServiceException("resource already exists, can't recreate"); + } + //create directory in hdfs + createDirectory(loginUser,fullName,type,result); + return result; + } + + /** + * create resource + * + * @param loginUser login user + * @param name alias + * @param desc description + * @param file file + * @param type type + * @param pid parent id + * @param currentDir current directory + * @return create result code + */ + @Transactional(rollbackFor = Exception.class) + public Result createResource(User loginUser, + String name, + String desc, + ResourceType type, + MultipartFile file, + int pid, + String currentDir) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + result = verifyPid(loginUser, pid); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + result = verifyFile(name, type, file); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + // check resource name exists + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name); + if (checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource {} has exist, can't recreate", RegexUtils.escapeNRT(name)); + putMsg(result, Status.RESOURCE_EXIST); + return result; + } + + Date now = new Date(); + Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now); + + try { + resourcesMapper.insert(resource); + putMsg(result, Status.SUCCESS); + Map dataMap = new BeanMap(resource); + Map resultMap = new HashMap<>(); + for (Map.Entry entry: dataMap.entrySet()) { + if (!"class".equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + } catch (Exception e) { + logger.error("resource already exists, can't recreate ", e); + throw new ServiceException("resource already exists, can't recreate"); + } + + // fail upload + if (!upload(loginUser, fullName, file, type)) { + logger.error("upload resource: {} file: {} failed.", RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.HDFS_OPERATION_ERROR); + throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); + } + return result; + } + + /** + * check resource is exists + * + * @param fullName fullName + * @param userId user id + * @param type type + * @return true if resource exists + */ + private boolean checkResourceExists(String fullName, int userId, int type) { + List resources = resourcesMapper.queryResourceList(fullName, userId, type); + return resources != null && !resources.isEmpty(); + } + + /** + * update resource + * @param loginUser login user + * @param resourceId resource id + * @param name name + * @param desc description + * @param type resource type + * @param file resource file + * @return update result code + */ + @Transactional(rollbackFor = Exception.class) + public Result updateResource(User loginUser, + int resourceId, + String name, + String desc, + ResourceType type, + MultipartFile file) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + Resource resource = resourcesMapper.selectById(resourceId); + if (resource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + if (!hasPerm(loginUser, resource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) { + putMsg(result, Status.SUCCESS); + return result; + } + + //check resource already exists + String originFullName = resource.getFullName(); + String originResourceName = resource.getAlias(); + + String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/") + 1),name); + if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource {} already exists, can't recreate", name); + putMsg(result, Status.RESOURCE_EXIST); + return result; + } + + result = verifyFile(name, type, file); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + // query tenant by user id + String tenantCode = getTenantCode(resource.getUserId(),result); + if (StringUtils.isEmpty(tenantCode)) { + return result; + } + // verify whether the resource exists in storage + // get the path of origin file in storage + String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName); + try { + if (!HadoopUtils.getInstance().exists(originHdfsFileName)) { + logger.error("{} not exist", originHdfsFileName); + putMsg(result,Status.RESOURCE_NOT_EXIST); + return result; + } + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new ServiceException(Status.HDFS_OPERATION_ERROR); + } + + if (!resource.isDirectory()) { + //get the origin file suffix + String originSuffix = FileUtils.suffix(originFullName); + String suffix = FileUtils.suffix(fullName); + boolean suffixIsChanged = false; + if (StringUtils.isBlank(suffix) && StringUtils.isNotBlank(originSuffix)) { + suffixIsChanged = true; + } + if (StringUtils.isNotBlank(suffix) && !suffix.equals(originSuffix)) { + suffixIsChanged = true; + } + //verify whether suffix is changed + if (suffixIsChanged) { + //need verify whether this resource is authorized to other users + Map columnMap = new HashMap<>(); + columnMap.put("resources_id", resourceId); + + List resourcesUsers = resourceUserMapper.selectByMap(columnMap); + if (CollectionUtils.isNotEmpty(resourcesUsers)) { + List userIds = resourcesUsers.stream().map(ResourcesUser::getUserId).collect(Collectors.toList()); + List users = userMapper.selectBatchIds(userIds); + String userNames = users.stream().map(User::getUserName).collect(Collectors.toList()).toString(); + logger.error("resource is authorized to user {},suffix not allowed to be modified", userNames); + putMsg(result,Status.RESOURCE_IS_AUTHORIZED,userNames); + return result; + } + } + } + + // updateResource data + Date now = new Date(); + + resource.setAlias(name); + resource.setFullName(fullName); + resource.setDescription(desc); + resource.setUpdateTime(now); + if (file != null) { + resource.setFileName(file.getOriginalFilename()); + resource.setSize(file.getSize()); + } + + try { + resourcesMapper.updateById(resource); + if (resource.isDirectory()) { + List childrenResource = listAllChildren(resource,false); + if (CollectionUtils.isNotEmpty(childrenResource)) { + String matcherFullName = Matcher.quoteReplacement(fullName); + List childResourceList; + Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]); + List resourceList = resourcesMapper.listResourceByIds(childResIdArray); + childResourceList = resourceList.stream().map(t -> { + t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + resourcesMapper.batchUpdateResource(childResourceList); + + if (ResourceType.UDF.equals(resource.getType())) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + udfFuncs = udfFuncs.stream().map(t -> { + t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); + } + } + } + } else if (ResourceType.UDF.equals(resource.getType())) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId}); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + udfFuncs = udfFuncs.stream().map(t -> { + t.setResourceName(fullName); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + udfFunctionMapper.batchUpdateUdfFunc(udfFuncs); + } + + } + + putMsg(result, Status.SUCCESS); + Map dataMap = new BeanMap(resource); + Map resultMap = new HashMap<>(); + for (Map.Entry entry: dataMap.entrySet()) { + if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + } catch (Exception e) { + logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e); + throw new ServiceException(Status.UPDATE_RESOURCE_ERROR); + } + + // if name unchanged, return directly without moving on HDFS + if (originResourceName.equals(name) && file == null) { + return result; + } + + if (file != null) { + // fail upload + if (!upload(loginUser, fullName, file, type)) { + logger.error("upload resource: {} file: {} failed.", name, RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.HDFS_OPERATION_ERROR); + throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); + } + if (!fullName.equals(originFullName)) { + try { + HadoopUtils.getInstance().delete(originHdfsFileName,false); + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new ServiceException(String.format("delete resource: %s failed.", originFullName)); + } + } + return result; + } + + // get the path of dest file in hdfs + String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); + + try { + logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName); + HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true); + } catch (Exception e) { + logger.error(MessageFormat.format("hdfs copy {0} -> {1} fail", originHdfsFileName, destHdfsFileName), e); + putMsg(result,Status.HDFS_COPY_FAIL); + throw new ServiceException(Status.HDFS_COPY_FAIL); + } + + return result; + } + + private Result verifyFile(String name, ResourceType type, MultipartFile file) { + Result result = new Result<>(); + putMsg(result, Status.SUCCESS); + if (file != null) { + // file is empty + if (file.isEmpty()) { + logger.error("file is empty: {}", RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); + return result; + } + + // file suffix + String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); + String nameSuffix = FileUtils.suffix(name); + + // determine file suffix + if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { + // rename file suffix and original suffix must be consistent + logger.error("rename file suffix and original suffix must be consistent: {}", RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); + return result; + } + + //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar + if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) { + logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); + putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); + return result; + } + if (file.getSize() > Constants.MAX_FILE_SIZE) { + logger.error("file size is too large: {}", RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); + return result; + } + } + return result; + } + + /** + * query resources list paging + * + * @param loginUser login user + * @param type resource type + * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @return resource list page + */ + public Map queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { + + HashMap result = new HashMap<>(); + Page page = new Page<>(pageNo, pageSize); + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + if (directoryId != -1) { + Resource directory = resourcesMapper.selectById(directoryId); + if (directory == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + } + + IPage resourceIPage = resourcesMapper.queryResourcePaging(page, + userId,directoryId, type.ordinal(), searchVal); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotalCount((int)resourceIPage.getTotal()); + pageInfo.setLists(resourceIPage.getRecords()); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result,Status.SUCCESS); + return result; + } + + /** + * create directory + * @param loginUser login user + * @param fullName full name + * @param type resource type + * @param result Result + */ + private void createDirectory(User loginUser,String fullName,ResourceType type,Result result) { + String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); + String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); + String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode); + try { + if (!HadoopUtils.getInstance().exists(resourceRootPath)) { + createTenantDirIfNotExists(tenantCode); + } + + if (!HadoopUtils.getInstance().mkdir(directoryName)) { + logger.error("create resource directory {} of hdfs failed",directoryName); + putMsg(result,Status.HDFS_OPERATION_ERROR); + throw new ServiceException(String.format("create resource directory: %s failed.", directoryName)); + } + } catch (Exception e) { + logger.error("create resource directory {} of hdfs failed",directoryName); + putMsg(result,Status.HDFS_OPERATION_ERROR); + throw new ServiceException(String.format("create resource directory: %s failed.", directoryName)); + } + } + + /** + * upload file to hdfs + * + * @param loginUser login user + * @param fullName full name + * @param file file + */ + private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) { + // save to local + String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); + String nameSuffix = FileUtils.suffix(fullName); + + // determine file suffix + if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { + return false; + } + // query tenant + String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); + // random file name + String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString()); + + // save file to hdfs, and delete original file + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); + String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode); + try { + // if tenant dir not exists + if (!HadoopUtils.getInstance().exists(resourcePath)) { + createTenantDirIfNotExists(tenantCode); + } + org.apache.dolphinscheduler.api.utils.FileUtils.copyFile(file, localFilename); + HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return false; + } + return true; + } + + /** + * query resource list + * + * @param loginUser login user + * @param type resource type + * @return resource list + */ + public Map queryResourceList(User loginUser, ResourceType type) { + Map result = new HashMap<>(); + + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0); + Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList); + result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); + putMsg(result,Status.SUCCESS); + + return result; + } + + /** + * query resource list by program type + * + * @param loginUser login user + * @param type resource type + * @return resource list + */ + public Map queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) { + Map result = new HashMap<>(); + String suffix = ".jar"; + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + if (programType != null) { + switch (programType) { + case JAVA: + case SCALA: + break; + case PYTHON: + suffix = ".py"; + break; + default: + } + } + List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0); + List resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter(); + Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources); + result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); + putMsg(result,Status.SUCCESS); + + return result; + } + + /** + * delete resource + * + * @param loginUser login user + * @param resourceId resource id + * @return delete result code + * @throws IOException exception + */ + @Transactional(rollbackFor = Exception.class) + public Result delete(User loginUser, int resourceId) throws IOException { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + // get resource by id + Resource resource = resourcesMapper.selectById(resourceId); + if (resource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + if (!hasPerm(loginUser, resource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + String tenantCode = getTenantCode(resource.getUserId(),result); + if (StringUtils.isEmpty(tenantCode)) { + return result; + } + + // get all resource id of process definitions those is released + List> list = processDefinitionMapper.listResources(); + Map> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list); + Set resourceIdSet = resourceProcessMap.keySet(); + // get all children of the resource + List allChildren = listAllChildren(resource,true); + Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]); + + //if resource type is UDF,need check whether it is bound by UDF function + if (resource.getType() == (ResourceType.UDF)) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + logger.error("can't be deleted,because it is bound by UDF functions:{}", udfFuncs); + putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName()); + return result; + } + } + + if (resourceIdSet.contains(resource.getPid())) { + logger.error("can't be deleted,because it is used of process definition"); + putMsg(result, Status.RESOURCE_IS_USED); + return result; + } + resourceIdSet.retainAll(allChildren); + if (CollectionUtils.isNotEmpty(resourceIdSet)) { + logger.error("can't be deleted,because it is used of process definition"); + for (Integer resId : resourceIdSet) { + logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId)); + } + putMsg(result, Status.RESOURCE_IS_USED); + return result; + } + + // get hdfs file by type + String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); + + //delete data in database + resourcesMapper.deleteIds(needDeleteResourceIdArray); + resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray); + + //delete file on hdfs + HadoopUtils.getInstance().delete(hdfsFilename, true); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * verify resource by name and type + * @param loginUser login user + * @param fullName resource full name + * @param type resource type + * @return true if the resource name not exists, otherwise return false + */ + public Result verifyResourceName(String fullName, ResourceType type, User loginUser) { + Result result = new Result<>(); + putMsg(result, Status.SUCCESS); + if (checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource type:{} name:{} has exist, can't create again.", type, RegexUtils.escapeNRT(fullName)); + putMsg(result, Status.RESOURCE_EXIST); + } else { + // query tenant + Tenant tenant = tenantMapper.queryById(loginUser.getTenantId()); + if (tenant != null) { + String tenantCode = tenant.getTenantCode(); + + try { + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); + if (HadoopUtils.getInstance().exists(hdfsFilename)) { + logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, RegexUtils.escapeNRT(fullName), hdfsFilename); + putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename); + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + putMsg(result,Status.HDFS_OPERATION_ERROR); + } + } else { + putMsg(result,Status.TENANT_NOT_EXIST); + } + } + + return result; + } + + /** + * verify resource by full name or pid and type + * @param fullName resource full name + * @param id resource id + * @param type resource type + * @return true if the resource full name or pid not exists, otherwise return false + */ + public Result queryResource(String fullName,Integer id,ResourceType type) { + Result result = new Result<>(); + if (StringUtils.isBlank(fullName) && id == null) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + return result; + } + if (StringUtils.isNotBlank(fullName)) { + List resourceList = resourcesMapper.queryResource(fullName,type.ordinal()); + if (CollectionUtils.isEmpty(resourceList)) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + putMsg(result, Status.SUCCESS); + result.setData(resourceList.get(0)); + } else { + Resource resource = resourcesMapper.selectById(id); + if (resource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + Resource parentResource = resourcesMapper.selectById(resource.getPid()); + if (parentResource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + putMsg(result, Status.SUCCESS); + result.setData(parentResource); + } + return result; + } + + /** + * view resource file online + * + * @param resourceId resource id + * @param skipLineNum skip line number + * @param limit limit + * @return resource content + */ + public Result readResource(int resourceId, int skipLineNum, int limit) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + // get resource by id + Resource resource = resourcesMapper.selectById(resourceId); + if (resource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + //check preview or not by file suffix + String nameSuffix = FileUtils.suffix(resource.getAlias()); + String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); + if (StringUtils.isNotEmpty(resourceViewSuffixs)) { + List strList = Arrays.asList(resourceViewSuffixs.split(",")); + if (!strList.contains(nameSuffix)) { + logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId); + putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); + return result; + } + } + + String tenantCode = getTenantCode(resource.getUserId(),result); + if (StringUtils.isEmpty(tenantCode)) { + return result; + } + + // hdfs path + String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName()); + logger.info("resource hdfs path is {}", hdfsFileName); + try { + if (HadoopUtils.getInstance().exists(hdfsFileName)) { + List content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit); + + putMsg(result, Status.SUCCESS); + Map map = new HashMap<>(); + map.put(ALIAS, resource.getAlias()); + map.put(CONTENT, String.join("\n", content)); + result.setData(map); + } else { + logger.error("read file {} not exist in hdfs", hdfsFileName); + putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName); + } + + } catch (Exception e) { + logger.error("Resource {} read failed", hdfsFileName, e); + putMsg(result, Status.HDFS_OPERATION_ERROR); + } + + return result; + } + + /** + * create resource file online + * + * @param loginUser login user + * @param type resource type + * @param fileName file name + * @param fileSuffix file suffix + * @param desc description + * @param content content + * @param pid pid + * @param currentDir current directory + * @return create result code + */ + @Transactional(rollbackFor = Exception.class) + public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDir) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + //check file suffix + String nameSuffix = fileSuffix.trim(); + String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); + if (StringUtils.isNotEmpty(resourceViewSuffixs)) { + List strList = Arrays.asList(resourceViewSuffixs.split(",")); + if (!strList.contains(nameSuffix)) { + logger.error("resource suffix {} not support create", nameSuffix); + putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); + return result; + } + } + + String name = fileName.trim() + "." + nameSuffix; + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name); + result = verifyResource(loginUser, type, fullName, pid); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + // save data + Date now = new Date(); + Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now); + + resourcesMapper.insert(resource); + + putMsg(result, Status.SUCCESS); + Map dataMap = new BeanMap(resource); + Map resultMap = new HashMap<>(); + for (Map.Entry entry: dataMap.entrySet()) { + if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + + String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); + + result = uploadContentToHdfs(fullName, tenantCode, content); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + throw new ServiceException(result.getMsg()); + } + return result; + } + + private Result checkResourceUploadStartupState() { + Result result = new Result<>(); + putMsg(result, Status.SUCCESS); + // if resource upload startup + if (!PropertyUtils.getResUploadStartupState()) { + logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); + putMsg(result, Status.HDFS_NOT_STARTUP); + return result; + } + return result; + } + + private Result verifyResource(User loginUser, ResourceType type, String fullName, int pid) { + Result result = verifyResourceName(fullName, type, loginUser); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + return verifyPid(loginUser, pid); + } + + private Result verifyPid(User loginUser, int pid) { + Result result = new Result<>(); + putMsg(result, Status.SUCCESS); + if (pid != -1) { + Resource parentResource = resourcesMapper.selectById(pid); + if (parentResource == null) { + putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); + return result; + } + if (!hasPerm(loginUser, parentResource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + } + return result; + } + + /** + * updateProcessInstance resource + * + * @param resourceId resource id + * @param content content + * @return update result cod + */ + @Transactional(rollbackFor = Exception.class) + public Result updateResourceContent(int resourceId, String content) { + Result result = checkResourceUploadStartupState(); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + return result; + } + + Resource resource = resourcesMapper.selectById(resourceId); + if (resource == null) { + logger.error("read file not exist, resource id {}", resourceId); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + //check can edit by file suffix + String nameSuffix = FileUtils.suffix(resource.getAlias()); + String resourceViewSuffixs = FileUtils.getResourceViewSuffixs(); + if (StringUtils.isNotEmpty(resourceViewSuffixs)) { + List strList = Arrays.asList(resourceViewSuffixs.split(",")); + if (!strList.contains(nameSuffix)) { + logger.error("resource suffix {} not support updateProcessInstance, resource id {}", nameSuffix, resourceId); + putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); + return result; + } + } + + String tenantCode = getTenantCode(resource.getUserId(),result); + if (StringUtils.isEmpty(tenantCode)) { + return result; + } + resource.setSize(content.getBytes().length); + resource.setUpdateTime(new Date()); + resourcesMapper.updateById(resource); + + result = uploadContentToHdfs(resource.getFullName(), tenantCode, content); + if (!result.getCode().equals(Status.SUCCESS.getCode())) { + throw new ServiceException(result.getMsg()); + } + return result; + } + + /** + * @param resourceName resource name + * @param tenantCode tenant code + * @param content content + * @return result + */ + private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) { + Result result = new Result<>(); + String localFilename = ""; + String hdfsFileName = ""; + try { + localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString()); + + if (!FileUtils.writeContent2File(content, localFilename)) { + // write file fail + logger.error("file {} fail, content is {}", localFilename, RegexUtils.escapeNRT(content)); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + + // get resource file hdfs path + hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName); + String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); + logger.info("resource hdfs path is {}, resource dir is {}", hdfsFileName, resourcePath); + + HadoopUtils hadoopUtils = HadoopUtils.getInstance(); + if (!hadoopUtils.exists(resourcePath)) { + // create if tenant dir not exists + createTenantDirIfNotExists(tenantCode); + } + if (hadoopUtils.exists(hdfsFileName)) { + hadoopUtils.delete(hdfsFileName, false); + } + + hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true); + } catch (Exception e) { + logger.error(e.getMessage(), e); + result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); + result.setMsg(String.format("copy %s to hdfs %s fail", localFilename, hdfsFileName)); + return result; + } + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * download file + * + * @param resourceId resource id + * @return resource content + * @throws IOException exception + */ + public org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException { + // if resource upload startup + if (!PropertyUtils.getResUploadStartupState()) { + logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); + throw new ServiceException("hdfs not startup"); + } + + Resource resource = resourcesMapper.selectById(resourceId); + if (resource == null) { + logger.error("download file not exist, resource id {}", resourceId); + return null; + } + if (resource.isDirectory()) { + logger.error("resource id {} is directory,can't download it", resourceId); + throw new ServiceException("can't download directory"); + } + + int userId = resource.getUserId(); + User user = userMapper.selectById(userId); + if (user == null) { + logger.error("user id {} not exists", userId); + throw new ServiceException(String.format("resource owner id %d not exist",userId)); + } + + Tenant tenant = tenantMapper.queryById(user.getTenantId()); + if (tenant == null) { + logger.error("tenant id {} not exists", user.getTenantId()); + throw new ServiceException(String.format("The tenant id %d of resource owner not exist",user.getTenantId())); + } + + String tenantCode = tenant.getTenantCode(); + + String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); + + String localFileName = FileUtils.getDownloadFilename(resource.getAlias()); + logger.info("resource hdfs path is {}, download local filename is {}", hdfsFileName, localFileName); + + HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true); + return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName); + } + + /** + * list all file + * + * @param loginUser login user + * @param userId user id + * @return unauthorized result code + */ + public Map authorizeResourceTree(User loginUser, Integer userId) { + + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + List resourceList = resourcesMapper.queryResourceExceptUserId(userId); + List list; + if (CollectionUtils.isNotEmpty(resourceList)) { + Visitor visitor = new ResourceTreeVisitor(resourceList); + list = visitor.visit().getChildren(); + } else { + list = new ArrayList<>(0); + } + + result.put(Constants.DATA_LIST, list); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * unauthorized file + * + * @param loginUser login user + * @param userId user id + * @return unauthorized result code + */ + public Map unauthorizedFile(User loginUser, Integer userId) { + + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + List resourceList = resourcesMapper.queryResourceExceptUserId(userId); + List list; + if (resourceList != null && !resourceList.isEmpty()) { + Set resourceSet = new HashSet<>(resourceList); + List authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId); + + getAuthorizedResourceList(resourceSet, authedResourceList); + list = new ArrayList<>(resourceSet); + } else { + list = new ArrayList<>(0); + } + Visitor visitor = new ResourceTreeVisitor(list); + result.put(Constants.DATA_LIST, visitor.visit().getChildren()); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * unauthorized udf function + * + * @param loginUser login user + * @param userId user id + * @return unauthorized result code + */ + public Map unauthorizedUDFFunction(User loginUser, Integer userId) { + Map result = new HashMap<>(); + //only admin can operate + if (isNotAdmin(loginUser, result)) { + return result; + } + + List udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId); + List resultList = new ArrayList<>(); + Set udfFuncSet; + if (CollectionUtils.isNotEmpty(udfFuncList)) { + udfFuncSet = new HashSet<>(udfFuncList); + + List authedUDFFuncList = udfFunctionMapper.queryAuthedUdfFunc(userId); + + getAuthorizedResourceList(udfFuncSet, authedUDFFuncList); + resultList = new ArrayList<>(udfFuncSet); + } + result.put(Constants.DATA_LIST, resultList); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * authorized udf function + * + * @param loginUser login user + * @param userId user id + * @return authorized result code + */ + public Map authorizedUDFFunction(User loginUser, Integer userId) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + List udfFuncs = udfFunctionMapper.queryAuthedUdfFunc(userId); + result.put(Constants.DATA_LIST, udfFuncs); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * authorized file + * + * @param loginUser login user + * @param userId user id + * @return authorized result + */ + public Map authorizedFile(User loginUser, Integer userId) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + List authedResources = resourcesMapper.queryAuthorizedResourceList(userId); + Visitor visitor = new ResourceTreeVisitor(authedResources); + String visit = JSONUtils.toJsonString(visitor.visit(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + logger.info(visit); + String jsonTreeStr = JSONUtils.toJsonString(visitor.visit().getChildren(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + logger.info(jsonTreeStr); + result.put(Constants.DATA_LIST, visitor.visit().getChildren()); + putMsg(result,Status.SUCCESS); + return result; + } + + /** + * get authorized resource list + * + * @param resourceSet resource set + * @param authedResourceList authorized resource list + */ + private void getAuthorizedResourceList(Set resourceSet, List authedResourceList) { + Set authedResourceSet; + if (CollectionUtils.isNotEmpty(authedResourceList)) { + authedResourceSet = new HashSet<>(authedResourceList); + resourceSet.removeAll(authedResourceSet); + } + } + + /** + * get tenantCode by UserId + * + * @param userId user id + * @param result return result + * @return tenant code + */ + private String getTenantCode(int userId,Result result) { + User user = userMapper.selectById(userId); + if (user == null) { + logger.error("user {} not exists", userId); + putMsg(result, Status.USER_NOT_EXIST,userId); + return null; + } + + Tenant tenant = tenantMapper.queryById(user.getTenantId()); + if (tenant == null) { + logger.error("tenant not exists"); + putMsg(result, Status.TENANT_NOT_EXIST); + return null; + } + return tenant.getTenantCode(); + } + + /** + * list all children id + * @param resource resource + * @param containSelf whether add self to children list + * @return all children id + */ + List listAllChildren(Resource resource,boolean containSelf) { + List childList = new ArrayList<>(); + if (resource.getId() != -1 && containSelf) { + childList.add(resource.getId()); + } + + if (resource.isDirectory()) { + listAllChildren(resource.getId(),childList); + } + return childList; + } + + /** + * list all children id + * @param resourceId resource id + * @param childList child list + */ + void listAllChildren(int resourceId,List childList) { + List children = resourcesMapper.listChildren(resourceId); + for (int childId : children) { + childList.add(childId); + listAllChildren(childId, childList); + } + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java new file mode 100644 index 0000000000..f6fd467b0a --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.dto.ScheduleParam; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.MonitorService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.SchedulerService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; +import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.quartz.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * scheduler service impl + */ +@Service +public class SchedulerServiceImpl extends BaseService implements SchedulerService { + + private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class); + + @Autowired + private ProjectService projectService; + + @Autowired + private ExecutorService executorService; + + @Autowired + private MonitorService monitorService; + + @Autowired + private ProcessService processService; + + @Autowired + private ScheduleMapper scheduleMapper; + + @Autowired + private ProjectMapper projectMapper; + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + /** + * save schedule + * + * @param loginUser login user + * @param projectName project name + * @param processDefineId process definition id + * @param schedule scheduler + * @param warningType warning type + * @param warningGroupId warning group id + * @param failureStrategy failure strategy + * @param processInstancePriority process instance priority + * @param workerGroup worker group + * @return create result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map insertSchedule(User loginUser, String projectName, + Integer processDefineId, + String schedule, + WarningType warningType, + int warningGroupId, + FailureStrategy failureStrategy, + Priority processInstancePriority, + String workerGroup) { + + Map result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + + // check project auth + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + + // check work flow define release state + ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId); + result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + + Schedule scheduleObj = new Schedule(); + Date now = new Date(); + + scheduleObj.setProjectName(projectName); + scheduleObj.setProcessDefinitionId(processDefinition.getId()); + scheduleObj.setProcessDefinitionName(processDefinition.getName()); + + ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); + if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { + logger.warn("The start time must not be the same as the end"); + putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); + return result; + } + scheduleObj.setStartTime(scheduleParam.getStartTime()); + scheduleObj.setEndTime(scheduleParam.getEndTime()); + if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { + logger.error("{} verify failure", scheduleParam.getCrontab()); + + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab()); + return result; + } + scheduleObj.setCrontab(scheduleParam.getCrontab()); + scheduleObj.setWarningType(warningType); + scheduleObj.setWarningGroupId(warningGroupId); + scheduleObj.setFailureStrategy(failureStrategy); + scheduleObj.setCreateTime(now); + scheduleObj.setUpdateTime(now); + scheduleObj.setUserId(loginUser.getId()); + scheduleObj.setUserName(loginUser.getUserName()); + scheduleObj.setReleaseState(ReleaseState.OFFLINE); + scheduleObj.setProcessInstancePriority(processInstancePriority); + scheduleObj.setWorkerGroup(workerGroup); + scheduleMapper.insert(scheduleObj); + + /** + * updateProcessInstance receivers and cc by process definition id + */ + processDefinition.setWarningGroupId(warningGroupId); + processDefinitionMapper.updateById(processDefinition); + + // return scheduler object with ID + result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId())); + putMsg(result, Status.SUCCESS); + + result.put("scheduleId", scheduleObj.getId()); + return result; + } + + /** + * updateProcessInstance schedule + * + * @param loginUser login user + * @param projectName project name + * @param id scheduler id + * @param scheduleExpression scheduler + * @param warningType warning type + * @param warningGroupId warning group id + * @param failureStrategy failure strategy + * @param workerGroup worker group + * @param processInstancePriority process instance priority + * @param scheduleStatus schedule status + * @return update result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map updateSchedule(User loginUser, + String projectName, + Integer id, + String scheduleExpression, + WarningType warningType, + int warningGroupId, + FailureStrategy failureStrategy, + ReleaseState scheduleStatus, + Priority processInstancePriority, + String workerGroup) { + Map result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + + // check project auth + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + + // check schedule exists + Schedule schedule = scheduleMapper.selectById(id); + + if (schedule == null) { + putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); + return result; + } + + ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId()); + return result; + } + + /** + * scheduling on-line status forbid modification + */ + if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { + return result; + } + + Date now = new Date(); + + // updateProcessInstance param + if (StringUtils.isNotEmpty(scheduleExpression)) { + ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class); + if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { + logger.warn("The start time must not be the same as the end"); + putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); + return result; + } + schedule.setStartTime(scheduleParam.getStartTime()); + schedule.setEndTime(scheduleParam.getEndTime()); + if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { + putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab()); + return result; + } + schedule.setCrontab(scheduleParam.getCrontab()); + } + + if (warningType != null) { + schedule.setWarningType(warningType); + } + + schedule.setWarningGroupId(warningGroupId); + + if (failureStrategy != null) { + schedule.setFailureStrategy(failureStrategy); + } + + if (scheduleStatus != null) { + schedule.setReleaseState(scheduleStatus); + } + schedule.setWorkerGroup(workerGroup); + schedule.setUpdateTime(now); + schedule.setProcessInstancePriority(processInstancePriority); + scheduleMapper.updateById(schedule); + + /** + * updateProcessInstance recipients and cc by process definition ID + */ + processDefinition.setWarningGroupId(warningGroupId); + + processDefinitionMapper.updateById(processDefinition); + + putMsg(result, Status.SUCCESS); + return result; + } + + + /** + * set schedule online or offline + * + * @param loginUser login user + * @param projectName project name + * @param id scheduler id + * @param scheduleStatus schedule status + * @return publish result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map setScheduleState(User loginUser, + String projectName, + Integer id, + ReleaseState scheduleStatus) { + Map result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + // check project auth + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + + // check schedule exists + Schedule scheduleObj = scheduleMapper.selectById(id); + + if (scheduleObj == null) { + putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); + return result; + } + // check schedule release state + if (scheduleObj.getReleaseState() == scheduleStatus) { + logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}", + scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); + putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); + return result; + } + ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId()); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId()); + return result; + } + + if (scheduleStatus == ReleaseState.ONLINE) { + // check process definition release state + if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { + logger.info("not release process definition id: {} , name : {}", + processDefinition.getId(), processDefinition.getName()); + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); + return result; + } + // check sub process definition release state + List subProcessDefineIds = new ArrayList<>(); + processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds); + Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]); + if (!subProcessDefineIds.isEmpty()) { + List subProcessDefinitionList = + processDefinitionMapper.queryDefinitionListByIdList(idArray); + if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { + for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { + /** + * if there is no online process, exit directly + */ + if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { + logger.info("not release process definition id: {} , name : {}", + subProcessDefinition.getId(), subProcessDefinition.getName()); + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId()); + return result; + } + } + } + } + } + + // check master server exists + List masterServers = monitorService.getServerListFromZK(true); + + if (masterServers.isEmpty()) { + putMsg(result, Status.MASTER_NOT_EXISTS); + return result; + } + + // set status + scheduleObj.setReleaseState(scheduleStatus); + + scheduleMapper.updateById(scheduleObj); + + try { + switch (scheduleStatus) { + case ONLINE: + logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); + setSchedule(project.getId(), scheduleObj); + break; + case OFFLINE: + logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); + deleteSchedule(project.getId(), id); + break; + default: + putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); + return result; + } + } catch (Exception e) { + result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); + throw new ServiceException(result.get(Constants.MSG).toString()); + } + + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query schedule + * + * @param loginUser login user + * @param projectName project name + * @param processDefineId process definition id + * @param pageNo page number + * @param pageSize page size + * @param searchVal search value + * @return schedule list page + */ + public Map querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) { + + HashMap result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + + // check project auth + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + + ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId); + return result; + } + Page page = new Page<>(pageNo, pageSize); + IPage scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging( + page, processDefineId, searchVal + ); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotalCount((int) scheduleIPage.getTotal()); + pageInfo.setLists(scheduleIPage.getRecords()); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * query schedule list + * + * @param loginUser login user + * @param projectName project name + * @return schedule list + */ + public Map queryScheduleList(User loginUser, String projectName) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + // check project auth + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + + List schedules = scheduleMapper.querySchedulerListByProjectName(projectName); + + result.put(Constants.DATA_LIST, schedules); + putMsg(result, Status.SUCCESS); + + return result; + } + + public void setSchedule(int projectId, Schedule schedule) { + int scheduleId = schedule.getId(); + logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId); + + Date startDate = schedule.getStartTime(); + Date endDate = schedule.getEndTime(); + + String jobName = QuartzExecutors.buildJobName(scheduleId); + String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); + + Map dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule); + + QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate, + schedule.getCrontab(), dataMap); + + } + + /** + * delete schedule + * + * @param projectId project id + * @param scheduleId schedule id + * @throws RuntimeException runtime exception + */ + public void deleteSchedule(int projectId, int scheduleId) { + logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId); + + String jobName = QuartzExecutors.buildJobName(scheduleId); + String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); + + if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) { + logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId); + throw new ServiceException("set offline failure"); + } + + } + + /** + * check valid + * + * @param result result + * @param bool bool + * @param status status + * @return check result code + */ + private boolean checkValid(Map result, boolean bool, Status status) { + // timeout is valid + if (bool) { + putMsg(result, status); + return true; + } + return false; + } + + /** + * delete schedule by id + * + * @param loginUser login user + * @param projectName project name + * @param scheduleId scheule id + * @return delete result code + */ + public Map deleteScheduleById(User loginUser, String projectName, Integer scheduleId) { + + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + Schedule schedule = scheduleMapper.selectById(scheduleId); + + if (schedule == null) { + putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId); + return result; + } + + // Determine if the login user is the owner of the schedule + if (loginUser.getId() != schedule.getUserId() + && loginUser.getUserType() != UserType.ADMIN_USER) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + // check schedule is already online + if (schedule.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); + return result; + } + + int delete = scheduleMapper.deleteById(scheduleId); + + if (delete > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); + } + return result; + } + + /** + * preview schedule + * + * @param loginUser login user + * @param projectName project name + * @param schedule schedule expression + * @return the next five fire time + */ + public Map previewSchedule(User loginUser, String projectName, String schedule) { + Map result = new HashMap<>(); + CronExpression cronExpression; + ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); + Date now = new Date(); + + Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime(); + Date endTime = scheduleParam.getEndTime(); + try { + cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab()); + } catch (ParseException e) { + logger.error(e.getMessage(), e); + putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR); + return result; + } + List selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT); + result.put(Constants.DATA_LIST, selfFireDateList.stream().map(DateUtils::dateToString)); + putMsg(result, Status.SUCCESS); + return result; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java new file mode 100644 index 0000000000..6c91e20a00 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.TaskInstanceService; +import org.apache.dolphinscheduler.api.service.UsersService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.text.MessageFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * task instance service impl + */ +@Service +public class TaskInstanceServiceImpl extends BaseService implements TaskInstanceService { + + @Autowired + ProjectMapper projectMapper; + + @Autowired + ProjectService projectService; + + @Autowired + ProcessService processService; + + @Autowired + TaskInstanceMapper taskInstanceMapper; + + @Autowired + ProcessInstanceService processInstanceService; + + @Autowired + UsersService usersService; + + /** + * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging + * + * @param loginUser login user + * @param projectName project name + * @param processInstanceId process instance id + * @param searchVal search value + * @param taskName task name + * @param stateType state type + * @param host host + * @param startDate start time + * @param endDate end time + * @param pageNo page number + * @param pageSize page size + * @return task list page + */ + public Map queryTaskListPaging(User loginUser, String projectName, + Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate, + String endDate, String searchVal, ExecutionStatus stateType, String host, + Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + + int[] statusArray = null; + if (stateType != null) { + statusArray = new int[]{stateType.ordinal()}; + } + + Date start = null; + Date end = null; + if (StringUtils.isNotEmpty(startDate)) { + start = DateUtils.getScheduleDate(startDate); + if (start == null) { + return generateInvalidParamRes(result, "startDate"); + } + } + if (StringUtils.isNotEmpty(endDate)) { + end = DateUtils.getScheduleDate(endDate); + if (end == null) { + return generateInvalidParamRes(result, "endDate"); + } + } + + Page page = new Page<>(pageNo, pageSize); + PageInfo> pageInfo = new PageInfo<>(pageNo, pageSize); + int executorId = usersService.getUserIdByName(executorName); + + IPage taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( + page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end + ); + Set exclusionSet = new HashSet<>(); + exclusionSet.add(Constants.CLASS); + exclusionSet.add("taskJson"); + List taskInstanceList = taskInstanceIPage.getRecords(); + + for (TaskInstance taskInstance : taskInstanceList) { + taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime())); + User executor = usersService.queryUser(taskInstance.getExecutorId()); + if (null != executor) { + taskInstance.setExecutorName(executor.getUserName()); + } + } + pageInfo.setTotalCount((int) taskInstanceIPage.getTotal()); + pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet)); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * change one task instance's state from failure to forced success + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + public Map forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + // check user auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + + // check whether the task instance can be found + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + // check whether the task instance state type is failure + if (!task.getState().typeIsFailure()) { + putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); + return result; + } + + // change the state of the task instance + task.setState(ExecutionStatus.FORCED_SUCCESS); + int changedNum = taskInstanceMapper.updateById(task); + if (changedNum > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); + } + + return result; + } + + /*** + * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name + * @param result exist result map + * @param params invalid params name + * @return update result map + */ + private Map generateInvalidParamRes(Map result, String params) { + result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params)); + return result; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 8aafe9102e..9f23428414 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -87,7 +87,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { int queueId, String desc) throws Exception { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); result.put(Constants.STATUS, false); if (isNotAdmin(loginUser, result)) { return result; @@ -140,7 +140,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { */ public Map queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (isNotAdmin(loginUser, result)) { return result; } @@ -171,7 +171,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { public Map updateTenant(User loginUser, int id, String tenantCode, int queueId, String desc) throws Exception { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); result.put(Constants.STATUS, false); if (isNotAdmin(loginUser, result)) { @@ -233,7 +233,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { */ @Transactional(rollbackFor = Exception.class) public Map deleteTenantById(User loginUser, int id) throws Exception { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (isNotAdmin(loginUser, result)) { return result; @@ -291,7 +291,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { */ public Map queryTenantList(String tenantCode) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); List resourceList = tenantMapper.queryByTenantCode(tenantCode); if (CollectionUtils.isNotEmpty(resourceList)) { @@ -311,7 +311,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { */ public Map queryTenantList(User loginUser) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); List resourceList = tenantMapper.selectList(null); result.put(Constants.DATA_LIST, resourceList); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index d1416dfd11..58e4912be5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -132,8 +132,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { String phone, String queue, int state) throws IOException { - - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //check all user params String msg = this.checkUserParams(userName, userPassword, email, phone); @@ -295,7 +294,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { * @return user list page */ public Map queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -337,7 +336,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { String phone, String queue, int state) throws IOException { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); result.put(Constants.STATUS, false); if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) { @@ -461,7 +460,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { * @throws Exception exception when operate hdfs */ public Map deleteUserById(User loginUser, int id) throws IOException { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (!isAdmin(loginUser)) { putMsg(result, Status.USER_NO_OPERATION_PERM, id); @@ -501,7 +500,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantProject(User loginUser, int userId, String projectIds) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); result.put(Constants.STATUS, false); //only admin can operate @@ -550,7 +549,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantResources(User loginUser, int userId, String resourceIds) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -645,7 +644,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantUDFFunction(User loginUser, int userId, String udfIds) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { @@ -691,7 +690,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { */ @Transactional(rollbackFor = RuntimeException.class) public Map grantDataSource(User loginUser, int userId, String datasourceIds) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); result.put(Constants.STATUS, false); //only admin can operate @@ -771,7 +770,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { * @return user list */ public Map queryAllGeneralUsers(User loginUser) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -791,7 +790,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { * @return user list */ public Map queryUserList(User loginUser) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -832,7 +831,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { */ public Map unauthorizedUser(User loginUser, Integer alertgroupId) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; @@ -867,7 +866,7 @@ public class UsersServiceImpl extends BaseService implements UsersService { * @return authorized result code */ public Map authorizedUser(User loginUser, Integer alertgroupId) { - Map result = new HashMap<>(5); + Map result = new HashMap<>(); //only admin can operate if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java index 9ff7fac463..482cb55306 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java @@ -44,4 +44,13 @@ public class RegexUtils { Matcher isNum = pattern.matcher(str); return isNum.matches(); } + + public static String escapeNRT(String str) { + // Logging should not be vulnerable to injection attacks: Replace pattern-breaking characters + if (str != null && !str.isEmpty()) { + return str.replaceAll("[\n|\r|\t]", "_"); + } + return null; + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index d430d3a755..7b9c869f49 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -68,7 +69,7 @@ public class ResourcesServiceTest { private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class); @InjectMocks - private ResourcesService resourcesService; + private ResourcesServiceImpl resourcesService; @Mock private ResourceMapper resourcesMapper; @Mock diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java index deadc2129c..389afed398 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; +import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.model.Server; @@ -53,7 +54,7 @@ public class SchedulerServiceTest { @InjectMocks - private SchedulerService schedulerService; + private SchedulerServiceImpl schedulerService; @Mock private MonitorService monitorService; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index b1989b4e31..0f0071c849 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; +import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.UserType; @@ -62,7 +63,7 @@ public class TaskInstanceServiceTest { private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class); @InjectMocks - private TaskInstanceService taskInstanceService; + private TaskInstanceServiceImpl taskInstanceService; @Mock ProjectMapper projectMapper; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java index 6bed928e14..de2d62d10e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java @@ -50,12 +50,8 @@ public class StringUtils { return !isBlank(s); } - public static String replaceNRTtoUnderline(String src) { - if (isBlank(src)) { - return src; - } else { - return src.replaceAll("[\n|\r|\t]", "_"); - } + public static String replaceNRTtoUnderline(String str) { + return isBlank(str) ? str : str.replaceAll("[\n|\r|\t]", "_"); } public static String trim(String str) {