diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index f28ba100f2..40effb641d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -60,6 +60,50 @@ public class ResourcesController extends BaseController{ @Autowired private UdfFuncService udfFuncService; + /** + * create resource + * + * @param loginUser login user + * @param alias alias + * @param description description + * @param type type + * @return create result code + */ + + /** + * + * @param loginUser login user + * @param type type + * @param alias alias + * @param description description + * @param pid parent id + * @param currentDir current directory + * @return + */ + @ApiOperation(value = "createDirctory", notes= "CREATE_RESOURCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"), + @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String"), + @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType ="String"), + @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile") + }) + @PostMapping(value = "/directory/create") + public Result createDirectory(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "type") ResourceType type, + @RequestParam(value ="name") String alias, + @RequestParam(value = "description", required = false) String description, + @RequestParam(value ="pid") int pid, + @RequestParam(value ="currentDir") String currentDir) { + try { + logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}", + loginUser.getUserName(),type, alias, description,pid,currentDir); + return resourceService.createDirectory(loginUser,alias, description,type ,pid,currentDir); + } catch (Exception e) { + logger.error(CREATE_RESOURCE_ERROR.getMsg(),e); + return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg()); + } + } + /** * create resource * @@ -80,13 +124,15 @@ public class ResourcesController extends BaseController{ @PostMapping(value = "/create") public Result createResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "type") ResourceType type, - @RequestParam(value ="name")String alias, + @RequestParam(value ="name") String alias, @RequestParam(value = "description", required = false) String description, - @RequestParam("file") MultipartFile file) { + @RequestParam("file") MultipartFile file, + @RequestParam(value ="pid") int pid, + @RequestParam(value ="currentDir") String currentDir) { try { logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}", loginUser.getUserName(),type, alias, description, file.getName(), file.getOriginalFilename()); - return resourceService.createResource(loginUser,alias, description,type ,file); + return resourceService.createResource(loginUser,alias, description,type ,file,pid,currentDir); } catch (Exception e) { logger.error(CREATE_RESOURCE_ERROR.getMsg(),e); return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg()); @@ -120,7 +166,7 @@ public class ResourcesController extends BaseController{ try { logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}", loginUser.getUserName(),type, alias, description); - return resourceService.updateResource(loginUser,resourceId,alias, description,type); + return resourceService.updateResource(loginUser,resourceId,alias,description,type); } catch (Exception e) { logger.error(UPDATE_RESOURCE_ERROR.getMsg(),e); return error(Status.UPDATE_RESOURCE_ERROR.getCode(), Status.UPDATE_RESOURCE_ERROR.getMsg()); @@ -166,6 +212,7 @@ public class ResourcesController extends BaseController{ @ApiOperation(value = "queryResourceListPaging", notes= "QUERY_RESOURCE_LIST_PAGING_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"), + @ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType ="int"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType ="String"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType ="Int",example = "20") @@ -174,6 +221,7 @@ public class ResourcesController extends BaseController{ @ResponseStatus(HttpStatus.OK) public Result queryResourceListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value ="type") ResourceType type, + @RequestParam(value ="id") int id, @RequestParam("pageNo") Integer pageNo, @RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam("pageSize") Integer pageSize @@ -187,7 +235,7 @@ public class ResourcesController extends BaseController{ } searchVal = ParameterUtils.handleEscapes(searchVal); - result = resourceService.queryResourceListPaging(loginUser,type,searchVal,pageNo, pageSize); + result = resourceService.queryResourceListPaging(loginUser,id,type,searchVal,pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_RESOURCES_LIST_PAGING.getMsg(),e); @@ -227,32 +275,89 @@ public class ResourcesController extends BaseController{ * verify resource by alias and type * * @param loginUser login user - * @param alias resource name - * @param type resource type + * @param fullName resource full name + * @param type resource type * @return true if the resource name not exists, otherwise return false */ @ApiOperation(value = "verifyResourceName", notes= "VERIFY_RESOURCE_NAME_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"), - @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String") + @ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String") }) @GetMapping(value = "/verify-name") @ResponseStatus(HttpStatus.OK) public Result verifyResourceName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value ="name") String alias, + @RequestParam(value ="fullName") String fullName, @RequestParam(value ="type") ResourceType type ) { try { logger.info("login user {}, verfiy resource alias: {},resource type: {}", - loginUser.getUserName(), alias,type); + loginUser.getUserName(), fullName,type); - return resourceService.verifyResourceName(alias,type,loginUser); + return resourceService.verifyResourceName(fullName,type,loginUser); } catch (Exception e) { logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e); return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg()); } } + /** + * query resources jar list + * + * @param loginUser login user + * @param type resource type + * @return resource list + */ + @ApiOperation(value = "queryResourceJarList", notes= "QUERY_RESOURCE_LIST_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType") + }) + @GetMapping(value="/list/jar") + @ResponseStatus(HttpStatus.OK) + public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value ="type") ResourceType type + ){ + try{ + logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString()); + Map result = resourceService.queryResourceJarList(loginUser, type); + return returnDataList(result); + }catch (Exception e){ + logger.error(QUERY_RESOURCES_LIST_ERROR.getMsg(),e); + return error(Status.QUERY_RESOURCES_LIST_ERROR.getCode(), Status.QUERY_RESOURCES_LIST_ERROR.getMsg()); + } + } + + /** + * query resource by full name and type + * + * @param loginUser login user + * @param fullName resource full name + * @param type resource type + * @return true if the resource name not exists, otherwise return false + */ + @ApiOperation(value = "queryResource", notes= "QUERY_BY_RESOURCE_NAME") + @ApiImplicitParams({ + @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"), + @ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String") + }) + @GetMapping(value = "/queryResource") + @ResponseStatus(HttpStatus.OK) + public Result queryResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value ="fullName",required = false) String fullName, + @RequestParam(value ="id",required = false) Integer id, + @RequestParam(value ="type") ResourceType type + ) { + try { + logger.info("login user {}, query resource by full name: {} or id: {},resource type: {}", + loginUser.getUserName(), fullName,id,type); + + return resourceService.queryResource(fullName,id,type); + } catch (Exception e) { + logger.error(RESOURCE_NOT_EXIST.getMsg(), e); + return error(Status.RESOURCE_NOT_EXIST.getCode(), Status.RESOURCE_NOT_EXIST.getMsg()); + } + } + /** * view resource file online * @@ -310,16 +415,18 @@ public class ResourcesController extends BaseController{ @RequestParam(value ="fileName")String fileName, @RequestParam(value ="suffix")String fileSuffix, @RequestParam(value = "description", required = false) String description, - @RequestParam(value = "content") String content + @RequestParam(value = "content") String content, + @RequestParam(value ="pid") int pid, + @RequestParam(value ="currentDir") String currentDir ) { try{ logger.info("login user {}, online create resource! fileName : {}, type : {}, suffix : {},desc : {},content : {}", - loginUser.getUserName(),fileName,type,fileSuffix,description,content); + loginUser.getUserName(),fileName,type,fileSuffix,description,content,pid,currentDir); if(StringUtils.isEmpty(content)){ logger.error("resource file contents are not allowed to be empty"); return error(Status.RESOURCE_FILE_IS_EMPTY.getCode(), RESOURCE_FILE_IS_EMPTY.getMsg()); } - return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content); + return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content,pid,currentDir); }catch (Exception e){ logger.error(CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg(),e); return error(Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getCode(), Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg()); @@ -384,6 +491,9 @@ public class ResourcesController extends BaseController{ .ok() .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"") .body(file); + }catch (RuntimeException e){ + logger.error(e.getMessage(),e); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage()); }catch (Exception e){ logger.error(DOWNLOAD_RESOURCE_FILE_ERROR.getMsg(),e); return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Status.DOWNLOAD_RESOURCE_FILE_ERROR.getMsg()); @@ -658,21 +768,21 @@ public class ResourcesController extends BaseController{ * @param userId user id * @return unauthorized result code */ - @ApiOperation(value = "unauthorizedFile", notes= "UNAUTHORIZED_FILE_NOTES") + @ApiOperation(value = "authorizeResourceTree", notes= "AUTHORIZE_RESOURCE_TREE_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType ="Int", example = "100") }) - @GetMapping(value = "/unauth-file") + @GetMapping(value = "/authorize-resource-tree") @ResponseStatus(HttpStatus.CREATED) - public Result unauthorizedFile(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result authorizeResourceTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("userId") Integer userId) { try{ - logger.info("resource unauthorized file, user:{}, unauthorized user id:{}", loginUser.getUserName(), userId); - Map result = resourceService.unauthorizedFile(loginUser, userId); + logger.info("all resource file, user:{}, user id:{}", loginUser.getUserName(), userId); + Map result = resourceService.authorizeResourceTree(loginUser, userId); return returnDataList(result); }catch (Exception e){ - logger.error(UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg(),e); - return error(Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getCode(), Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg()); + logger.error(AUTHORIZE_RESOURCE_TREE.getMsg(),e); + return error(Status.AUTHORIZE_RESOURCE_TREE.getCode(), Status.AUTHORIZE_RESOURCE_TREE.getMsg()); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java new file mode 100644 index 0000000000..289d5060bf --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.api.dto.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * directory + */ +public class Directory extends ResourceComponent{ + + @Override + public boolean isDirctory() { + return true; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java new file mode 100644 index 0000000000..b9b91821f4 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.api.dto.resources; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * file leaf + */ +public class FileLeaf extends ResourceComponent{ + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java new file mode 100644 index 0000000000..fb0da702b3 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java @@ -0,0 +1,193 @@ +package org.apache.dolphinscheduler.api.dto.resources; + +import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson.annotation.JSONType; +import org.apache.dolphinscheduler.common.enums.ResourceType; + +import java.util.ArrayList; +import java.util.List; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * resource component + */ +@JSONType(orders={"id","pid","name","fullName","description","isDirctory","children","type"}) +public abstract class ResourceComponent { + public ResourceComponent() { + } + + public ResourceComponent(int id, int pid, String name, String fullName, String description, boolean isDirctory) { + this.id = id; + this.pid = pid; + this.name = name; + this.fullName = fullName; + this.description = description; + this.isDirctory = isDirctory; + int directoryFlag = isDirctory ? 1:0; + this.idValue = String.format("%s_%s",id,directoryFlag); + } + + + /** + * id + */ + @JSONField(ordinal = 1) + protected int id; + /** + * parent id + */ + @JSONField(ordinal = 2) + protected int pid; + /** + * name + */ + @JSONField(ordinal = 3) + protected String name; + /** + * current directory + */ + protected String currentDir; + /** + * full name + */ + @JSONField(ordinal = 4) + protected String fullName; + /** + * description + */ + @JSONField(ordinal = 5) + protected String description; + /** + * is directory + */ + @JSONField(ordinal = 6) + protected boolean isDirctory; + /** + * id value + */ + @JSONField(ordinal = 7) + protected String idValue; + /** + * resoruce type + */ + @JSONField(ordinal = 8) + protected ResourceType type; + /** + * children + */ + @JSONField(ordinal = 8) + protected List children = new ArrayList<>(); + + /** + * add resource component + * @param resourceComponent resource component + */ + public void add(ResourceComponent resourceComponent){ + children.add(resourceComponent); + } + + public String getName(){ + return this.name; + } + + public String getDescription(){ + return this.description; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getPid() { + return pid; + } + + public void setPid(int pid) { + this.pid = pid; + } + + public void setName(String name) { + this.name = name; + } + + public String getFullName() { + return fullName; + } + + public void setFullName(String fullName) { + this.fullName = fullName; + } + + public void setDescription(String description) { + this.description = description; + } + + public boolean isDirctory() { + return isDirctory; + } + + public void setDirctory(boolean dirctory) { + isDirctory = dirctory; + } + + public String getIdValue() { + return idValue; + } + + public void setIdValue(int id,boolean isDirctory) { + int directoryFlag = isDirctory ? 1:0; + this.idValue = String.format("%s_%s",id,directoryFlag); + } + + public ResourceType getType() { + return type; + } + + public void setType(ResourceType type) { + this.type = type; + } + + public List getChildren() { + return children; + } + + public void setChildren(List children) { + this.children = children; + } + + @Override + public String toString() { + return "ResourceComponent{" + + "id=" + id + + ", pid=" + pid + + ", name='" + name + '\'' + + ", currentDir='" + currentDir + '\'' + + ", fullName='" + fullName + '\'' + + ", description='" + description + '\'' + + ", isDirctory=" + isDirctory + + ", idValue='" + idValue + '\'' + + ", type=" + type + + ", children=" + children + + '}'; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java new file mode 100644 index 0000000000..ce6ce3a011 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto.resources.filter; + +import org.apache.dolphinscheduler.dao.entity.Resource; + +import java.util.List; + +/** + * interface filter + */ +public interface IFilter { + List filter(); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java new file mode 100644 index 0000000000..c918a160af --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto.resources.filter; + +import org.apache.dolphinscheduler.dao.entity.Resource; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * resource filter + */ +public class ResourceFilter implements IFilter { + /** + * resource suffix + */ + private String suffix; + /** + * resource list + */ + private List resourceList; + + /** + * parent list + */ + //Set parentList = new HashSet<>(); + + /** + * constructor + * @param suffix resource suffix + * @param resourceList resource list + */ + public ResourceFilter(String suffix, List resourceList) { + this.suffix = suffix; + this.resourceList = resourceList; + } + + /** + * file filter + * @return file filtered by suffix + */ + public Set fileFilter(){ + Set resources = resourceList.stream().filter(t -> { + String alias = t.getAlias(); + return alias.endsWith(suffix); + }).collect(Collectors.toSet()); + return resources; + } + + /** + * list all parent dir + * @return parent resource dir set + */ + Set listAllParent(){ + Set parentList = new HashSet<>(); + Set filterFileList = fileFilter(); + for(Resource file:filterFileList){ + parentList.add(file); + setAllParent(file,parentList); + } + return parentList; + + } + + /** + * list all parent dir + * @param resource resource + * @return parent resource dir set + */ + private void setAllParent(Resource resource,Set parentList){ + for (Resource resourceTemp : resourceList) { + if (resourceTemp.getId() == resource.getPid()) { + parentList.add(resourceTemp); + setAllParent(resourceTemp,parentList); + } + } + } + + @Override + public List filter() { + return new ArrayList<>(listAllParent()); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java new file mode 100644 index 0000000000..5cf118800a --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto.resources.visitor; + + +import org.apache.dolphinscheduler.api.dto.resources.Directory; +import org.apache.dolphinscheduler.api.dto.resources.FileLeaf; +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; +import org.apache.dolphinscheduler.dao.entity.Resource; + +import java.util.ArrayList; +import java.util.List; + +/** + * resource tree visitor + */ +public class ResourceTreeVisitor implements Visitor{ + + /** + * resource list + */ + private List resourceList; + + public ResourceTreeVisitor() { + } + + /** + * constructor + * @param resourceList resource list + */ + public ResourceTreeVisitor(List resourceList) { + this.resourceList = resourceList; + } + + /** + * visit + * @return resoruce component + */ + public ResourceComponent visit() { + ResourceComponent rootDirectory = new Directory(); + for (Resource resource : resourceList) { + // judge whether is root node + if (rootNode(resource)){ + ResourceComponent tempResourceComponent = getResourceComponent(resource); + rootDirectory.add(tempResourceComponent); + tempResourceComponent.setChildren(setChildren(tempResourceComponent.getId(),resourceList)); + } + } + return rootDirectory; + } + + /** + * set children + * @param id id + * @param list resource list + * @return resource component list + */ + public static List setChildren(int id, List list ){ + List childList = new ArrayList<>(); + for (Resource resource : list) { + if (id == resource.getPid()){ + ResourceComponent tempResourceComponent = getResourceComponent(resource); + childList.add(tempResourceComponent); + } + } + for (ResourceComponent resourceComponent : childList) { + resourceComponent.setChildren(setChildren(resourceComponent.getId(),list)); + } + if (childList.size()==0){ + return new ArrayList<>(); + } + return childList; + } + + /** + * Determine whether it is the root node + * @param resource resource + * @return true if it is the root node + */ + public boolean rootNode(Resource resource) { + + boolean isRootNode = true; + if(resource.getPid() != -1 ){ + for (Resource parent : resourceList) { + if (resource.getPid() == parent.getId()) { + isRootNode = false; + break; + } + } + } + return isRootNode; + } + + /** + * get resource component by resource + * @param resource resource + * @return resource component + */ + private static ResourceComponent getResourceComponent(Resource resource) { + ResourceComponent tempResourceComponent; + if(resource.isDirectory()){ + tempResourceComponent = new Directory(); + }else{ + tempResourceComponent = new FileLeaf(); + } + tempResourceComponent.setName(resource.getAlias()); + tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/","")); + tempResourceComponent.setId(resource.getId()); + tempResourceComponent.setPid(resource.getPid()); + tempResourceComponent.setIdValue(resource.getId(),resource.isDirectory()); + tempResourceComponent.setDescription(resource.getDescription()); + tempResourceComponent.setType(resource.getType()); + return tempResourceComponent; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java new file mode 100644 index 0000000000..3dfce7c7c1 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java @@ -0,0 +1,31 @@ +package org.apache.dolphinscheduler.api.dto.resources.visitor; + + +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Visitor + */ +public interface Visitor { + /** + * visit + * @return resource component + */ + ResourceComponent visit(); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 9955463f8e..416dc0ef54 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -97,7 +97,7 @@ public enum Status { VERIFY_UDF_FUNCTION_NAME_ERROR( 10070,"verify udf function name error", "UDF函数名称验证错误"), DELETE_UDF_FUNCTION_ERROR( 10071,"delete udf function error", "删除UDF函数错误"), AUTHORIZED_FILE_RESOURCE_ERROR( 10072,"authorized file resource error", "授权资源文件错误"), - UNAUTHORIZED_FILE_RESOURCE_ERROR( 10073,"unauthorized file resource error", "查询未授权资源错误"), + AUTHORIZE_RESOURCE_TREE( 10073,"authorize resource tree display error","授权资源目录树错误"), UNAUTHORIZED_UDF_FUNCTION_ERROR( 10074,"unauthorized udf function error", "查询未授权UDF函数错误"), AUTHORIZED_UDF_FUNCTION_ERROR(10075,"authorized udf function error", "授权UDF函数错误"), CREATE_SCHEDULE_ERROR(10076,"create schedule error", "创建调度配置错误"), @@ -184,10 +184,12 @@ public enum Status { RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit", "上传资源文件大小超过限制"), RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified", "资源文件后缀不支持修改"), UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar", "UDF资源文件后缀名只支持[jar]"), - HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"), - RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"), - RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"), - + HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"), + RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"), + RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"), + UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"), + RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"), + PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"), USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 44c2b44ebb..734adb9b71 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -38,11 +38,9 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.utils.DagHelper; @@ -162,6 +160,31 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } + /** + * get resource ids + * @param processData process data + * @return resource ids + */ + private String getResourceIds(ProcessData processData) { + List tasks = processData.getTasks(); + Set resourceIds = new HashSet<>(); + for(TaskNode taskNode : tasks){ + String taskParameter = taskNode.getParams(); + AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); + Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); + resourceIds.addAll(tempSet); + } + + StringBuilder sb = new StringBuilder(); + for(int i : resourceIds) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(i); + } + return sb.toString(); + } + /** * query proccess definition list @@ -946,7 +969,9 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); //process data check @@ -1163,6 +1188,7 @@ public class ProcessDefinitionService extends BaseDAGService { private DAG genDagGraph(ProcessDefinition processDefinition) throws Exception { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); //check process data diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index f46eda757a..ff87aadbc7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -16,9 +16,15 @@ */ package org.apache.dolphinscheduler.api.service; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.commons.collections.BeanMap; +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; +import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; +import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; +import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; @@ -39,6 +45,7 @@ import org.springframework.web.multipart.MultipartFile; import java.text.MessageFormat; import java.util.*; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.*; @@ -65,6 +72,82 @@ public class ResourcesService extends BaseService { @Autowired private ResourceUserMapper resourceUserMapper; + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + /** + * create directory + * + * @param loginUser login user + * @param name alias + * @param description description + * @param type type + * @param pid parent id + * @param currentDir current directory + * @return create directory result + */ + @Transactional(rollbackFor = Exception.class) + public Result createDirectory(User loginUser, + String name, + String description, + ResourceType type, + int pid, + String currentDir) { + Result result = new Result(); + // if hdfs not startup + if (!PropertyUtils.getResUploadStartupState()){ + logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); + putMsg(result, Status.HDFS_NOT_STARTUP); + return result; + } + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); + + if (pid != -1) { + Resource parentResource = resourcesMapper.selectById(pid); + + if (parentResource == null) { + putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST); + return result; + } + + if (!hasPerm(loginUser, parentResource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + } + + + if (checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource directory {} has exist, can't recreate", fullName); + putMsg(result, Status.RESOURCE_EXIST); + return result; + } + + Date now = new Date(); + + Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now); + + try { + resourcesMapper.insert(resource); + + putMsg(result, Status.SUCCESS); + Map dataMap = new BeanMap(resource); + Map resultMap = new HashMap(); + for (Map.Entry entry: dataMap.entrySet()) { + if (!"class".equalsIgnoreCase(entry.getKey().toString())) { + resultMap.put(entry.getKey().toString(), entry.getValue()); + } + } + result.setData(resultMap); + } catch (Exception e) { + logger.error("resource already exists, can't recreate ", e); + throw new RuntimeException("resource already exists, can't recreate"); + } + //create directory in hdfs + createDirecotry(loginUser,fullName,type,result); + return result; + } + /** * create resource * @@ -73,6 +156,8 @@ public class ResourcesService extends BaseService { * @param desc description * @param file file * @param type type + * @param pid parent id + * @param currentDir current directory * @return create result code */ @Transactional(rollbackFor = Exception.class) @@ -80,7 +165,9 @@ public class ResourcesService extends BaseService { String name, String desc, ResourceType type, - MultipartFile file) { + MultipartFile file, + int pid, + String currentDir) { Result result = new Result(); // if hdfs not startup @@ -123,7 +210,8 @@ public class ResourcesService extends BaseService { } // check resoure name exists - if (checkResourceExists(name, 0, type.ordinal())) { + String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name); + if (checkResourceExists(fullName, 0, type.ordinal())) { logger.error("resource {} has exist, can't recreate", name); putMsg(result, Status.RESOURCE_EXIST); return result; @@ -131,7 +219,9 @@ public class ResourcesService extends BaseService { Date now = new Date(); - Resource resource = new Resource(name,file.getOriginalFilename(),desc,loginUser.getId(),type,file.getSize(),now,now); + + + Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now); try { resourcesMapper.insert(resource); @@ -151,7 +241,7 @@ public class ResourcesService extends BaseService { } // fail upload - if (!upload(loginUser, name, file, type)) { + if (!upload(loginUser, fullName, file, type)) { logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); putMsg(result, Status.HDFS_OPERATION_ERROR); throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); @@ -162,27 +252,29 @@ public class ResourcesService extends BaseService { /** * check resource is exists * - * @param alias alias + * @param fullName fullName * @param userId user id * @param type type * @return true if resource exists */ - private boolean checkResourceExists(String alias, int userId, int type ){ - List resources = resourcesMapper.queryResourceList(alias, userId, type); - return CollectionUtils.isNotEmpty(resources); - } + private boolean checkResourceExists(String fullName, int userId, int type ){ + List resources = resourcesMapper.queryResourceList(fullName, userId, type); + if (resources != null && resources.size() > 0) { + return true; + } + return false; + } /** * update resource - * - * @param loginUser login user - * @param name alias - * @param resourceId resource id - * @param type resource type - * @param desc description - * @return update result code + * @param loginUser login user + * @param resourceId resource id + * @param name name + * @param desc description + * @param type resource type + * @return update result code */ @Transactional(rollbackFor = Exception.class) public Result updateResource(User loginUser, @@ -216,7 +308,10 @@ public class ResourcesService extends BaseService { } //check resource aleady exists - if (!resource.getAlias().equals(name) && checkResourceExists(name, 0, type.ordinal())) { + String originFullName = resource.getFullName(); + + String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name); + if (!resource.getAlias().equals(name) && checkResourceExists(fullName, 0, type.ordinal())) { logger.error("resource {} already exists, can't recreate", name); putMsg(result, Status.RESOURCE_EXIST); return result; @@ -227,25 +322,41 @@ public class ResourcesService extends BaseService { if (StringUtils.isEmpty(tenantCode)){ return result; } - - //get the file suffix + String nameWithSuffix = name; String originResourceName = resource.getAlias(); - String suffix = originResourceName.substring(originResourceName.lastIndexOf('.')); + if (!resource.isDirectory()) { + //get the file suffix - //if the name without suffix then add it ,else use the origin name - String nameWithSuffix = name; - if(!name.endsWith(suffix)){ - nameWithSuffix = nameWithSuffix + suffix; + String suffix = originResourceName.substring(originResourceName.lastIndexOf(".")); + + //if the name without suffix then add it ,else use the origin name + if(!name.endsWith(suffix)){ + nameWithSuffix = nameWithSuffix + suffix; + } } // updateResource data + List childrenResource = listAllChildren(resource); + String oldFullName = resource.getFullName(); Date now = new Date(); + resource.setAlias(nameWithSuffix); + resource.setFullName(fullName); resource.setDescription(desc); resource.setUpdateTime(now); try { resourcesMapper.updateById(resource); + if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) { + List childResourceList = new ArrayList<>(); + List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); + childResourceList = resourceList.stream().map(t -> { + t.setFullName(t.getFullName().replaceFirst(oldFullName, fullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + resourcesMapper.batchUpdateResource(childResourceList); + } putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); @@ -267,15 +378,9 @@ public class ResourcesService extends BaseService { // get file hdfs path // delete hdfs file by type - String originHdfsFileName = ""; - String destHdfsFileName = ""; - if (resource.getType().equals(ResourceType.FILE)) { - originHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, originResourceName); - destHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, name); - } else if (resource.getType().equals(ResourceType.UDF)) { - originHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, originResourceName); - destHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, name); - } + String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName); + String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); + try { if (HadoopUtils.getInstance().exists(originHdfsFileName)) { logger.info("hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName); @@ -303,7 +408,7 @@ public class ResourcesService extends BaseService { * @param pageSize page size * @return resource list page */ - public Map queryResourceListPaging(User loginUser, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { + public Map queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { HashMap result = new HashMap<>(5); Page page = new Page(pageNo, pageSize); @@ -312,7 +417,7 @@ public class ResourcesService extends BaseService { userId= 0; } IPage resourceIPage = resourcesMapper.queryResourcePaging(page, - userId, type.ordinal(), searchVal); + userId,direcotryId, type.ordinal(), searchVal); PageInfo pageInfo = new PageInfo(pageNo, pageSize); pageInfo.setTotalCount((int)resourceIPage.getTotal()); pageInfo.setLists(resourceIPage.getRecords()); @@ -321,17 +426,46 @@ public class ResourcesService extends BaseService { return result; } + /** + * create direcoty + * @param loginUser login user + * @param fullName full name + * @param type resource type + * @param result Result + */ + private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){ + // query tenant + String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); + String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); + String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode); + try { + if (!HadoopUtils.getInstance().exists(resourceRootPath)) { + createTenantDirIfNotExists(tenantCode); + } + + if (!HadoopUtils.getInstance().mkdir(directoryName)) { + logger.error("create resource directory {} of hdfs failed",directoryName); + putMsg(result,Status.HDFS_OPERATION_ERROR); + throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName)); + } + } catch (Exception e) { + logger.error("create resource directory {} of hdfs failed",directoryName); + putMsg(result,Status.HDFS_OPERATION_ERROR); + throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName)); + } + } + /** * upload file to hdfs * - * @param loginUser - * @param name - * @param file + * @param loginUser login user + * @param fullName full name + * @param file file */ - private boolean upload(User loginUser, String name, MultipartFile file, ResourceType type) { + private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) { // save to local String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); - String nameSuffix = FileUtils.suffix(name); + String nameSuffix = FileUtils.suffix(fullName); // determine file suffix if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { @@ -344,15 +478,8 @@ public class ResourcesService extends BaseService { // save file to hdfs, and delete original file - String hdfsFilename = ""; - String resourcePath = ""; - if (type.equals(ResourceType.FILE)) { - hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsResDir(tenantCode); - } else if (type.equals(ResourceType.UDF)) { - hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode); - } + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); + String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode); try { // if tenant dir not exists if (!HadoopUtils.getInstance().exists(resourcePath)) { @@ -377,13 +504,59 @@ public class ResourcesService extends BaseService { public Map queryResourceList(User loginUser, ResourceType type) { Map result = new HashMap<>(5); - List resourceList; + + Set allResourceList = getAllResources(loginUser, type); + Visitor resourceTreeVisitor = new ResourceTreeVisitor(new ArrayList<>(allResourceList)); + //JSONArray jsonArray = JSON.parseArray(JSON.toJSONString(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField)); + result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); + putMsg(result,Status.SUCCESS); + + return result; + } + + /** + * get all resources + * @param loginUser login user + * @return all resource set + */ + private Set getAllResources(User loginUser, ResourceType type) { int userId = loginUser.getId(); + boolean listChildren = true; if(isAdmin(loginUser)){ userId = 0; + listChildren = false; + } + List resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal()); + Set allResourceList = new HashSet<>(resourceList); + if (listChildren) { + Set authorizedIds = new HashSet<>(); + List authorizedDirecoty = resourceList.stream().filter(t->t.getUserId() != loginUser.getId() && t.isDirectory()).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(authorizedDirecoty)) { + for(Resource resource : authorizedDirecoty){ + authorizedIds.addAll(listAllChildren(resource)); + } + List childrenResources = resourcesMapper.listResourceByIds(authorizedIds.toArray(new Integer[authorizedIds.size()])); + allResourceList.addAll(childrenResources); + } } - resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal()); - result.put(Constants.DATA_LIST, resourceList); + return allResourceList; + } + + /** + * query resource list + * + * @param loginUser login user + * @param type resource type + * @return resource list + */ + public Map queryResourceJarList(User loginUser, ResourceType type) { + + Map result = new HashMap<>(5); + + Set allResourceList = getAllResources(loginUser, type); + List resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter(); + Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources); + result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren()); putMsg(result,Status.SUCCESS); return result; @@ -419,23 +592,51 @@ public class ResourcesService extends BaseService { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } + //if resource type is UDF,need check whether it is bound by UDF functon + if (resource.getType() == (ResourceType.UDF)) { + List udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId}); + if (CollectionUtils.isNotEmpty(udfFuncs)) { + logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString()); + putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName()); + return result; + } + } - Tenant tenant = tenantMapper.queryById(loginUser.getTenantId()); - if (tenant == null){ - putMsg(result, Status.TENANT_NOT_EXIST); + String tenantCode = getTenantCode(resource.getUserId(),result); + if (StringUtils.isEmpty(tenantCode)){ + return result; + } + + // get all resource id of process definitions those is released + Map> resourceProcessMap = getResourceProcessMap(); + Set resourceIdSet = resourceProcessMap.keySet(); + // get all children of the resource + List allChildren = listAllChildren(resource); + + if (resourceIdSet.contains(resource.getPid())) { + logger.error("can't be deleted,because it is used of process definition"); + putMsg(result, Status.RESOURCE_IS_USED); + return result; + } + resourceIdSet.retainAll(allChildren); + if (CollectionUtils.isNotEmpty(resourceIdSet)) { + logger.error("can't be deleted,because it is used of process definition"); + for (Integer resId : resourceIdSet) { + logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId)); + } + putMsg(result, Status.RESOURCE_IS_USED); return result; } - String hdfsFilename = ""; - // delete hdfs file by type - String tenantCode = tenant.getTenantCode(); - hdfsFilename = getHdfsFileName(resource, tenantCode, hdfsFilename); + // get hdfs file by type + String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); //delete data in database - resourcesMapper.deleteById(resourceId); + resourcesMapper.deleteIds(allChildren.toArray(new Integer[allChildren.size()])); resourceUserMapper.deleteResourceUser(0, resourceId); + //delete file on hdfs - HadoopUtils.getInstance().delete(hdfsFilename, false); + HadoopUtils.getInstance().delete(hdfsFilename, true); putMsg(result, Status.SUCCESS); return result; @@ -444,15 +645,15 @@ public class ResourcesService extends BaseService { /** * verify resource by name and type * @param loginUser login user - * @param name resource alias - * @param type resource type + * @param fullName resource full name + * @param type resource type * @return true if the resource name not exists, otherwise return false */ - public Result verifyResourceName(String name, ResourceType type,User loginUser) { + public Result verifyResourceName(String fullName, ResourceType type,User loginUser) { Result result = new Result(); putMsg(result, Status.SUCCESS); - if (checkResourceExists(name, 0, type.ordinal())) { - logger.error("resource type:{} name:{} has exist, can't create again.", type, name); + if (checkResourceExists(fullName, 0, type.ordinal())) { + logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName); putMsg(result, Status.RESOURCE_EXIST); } else { // query tenant @@ -461,9 +662,9 @@ public class ResourcesService extends BaseService { String tenantCode = tenant.getTenantCode(); try { - String hdfsFilename = getHdfsFileName(type,tenantCode,name); + String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName); if(HadoopUtils.getInstance().exists(hdfsFilename)){ - logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename); + logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename); putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename); } @@ -480,6 +681,48 @@ public class ResourcesService extends BaseService { return result; } + /** + * verify resource by full name or pid and type + * @param fullName resource full name + * @param id resource id + * @param type resource type + * @return true if the resource full name or pid not exists, otherwise return false + */ + public Result queryResource(String fullName,Integer id,ResourceType type) { + Result result = new Result(); + if (StringUtils.isBlank(fullName) && id == null) { + logger.error("You must input one of fullName and pid"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + return result; + } + if (StringUtils.isNotBlank(fullName)) { + List resourceList = resourcesMapper.queryResource(fullName,type.ordinal()); + if (CollectionUtils.isEmpty(resourceList)) { + logger.error("resource file not exist, resource full name {} ", fullName); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + putMsg(result, Status.SUCCESS); + result.setData(resourceList.get(0)); + } else { + Resource resource = resourcesMapper.selectById(id); + if (resource == null) { + logger.error("resource file not exist, resource id {}", id); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + Resource parentResource = resourcesMapper.selectById(resource.getPid()); + if (parentResource == null) { + logger.error("parent resource file not exist, resource id {}", id); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + putMsg(result, Status.SUCCESS); + result.setData(parentResource); + } + return result; + } + /** * view resource file online * @@ -501,7 +744,7 @@ public class ResourcesService extends BaseService { // get resource by id Resource resource = resourcesMapper.selectById(resourceId); if (resource == null) { - logger.error("resouce file not exist, resource id {}", resourceId); + logger.error("resource file not exist, resource id {}", resourceId); putMsg(result, Status.RESOURCE_NOT_EXIST); return result; } @@ -511,7 +754,7 @@ public class ResourcesService extends BaseService { if (StringUtils.isNotEmpty(resourceViewSuffixs)) { List strList = Arrays.asList(resourceViewSuffixs.split(",")); if (!strList.contains(nameSuffix)) { - logger.error("resouce suffix {} not support view, resource id {}", nameSuffix, resourceId); + logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId); putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW); return result; } @@ -523,7 +766,7 @@ public class ResourcesService extends BaseService { } // hdfs path - String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); + String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName()); logger.info("resource hdfs path is {} ", hdfsFileName); try { if(HadoopUtils.getInstance().exists(hdfsFileName)){ @@ -559,7 +802,7 @@ public class ResourcesService extends BaseService { * @return create result code */ @Transactional(rollbackFor = Exception.class) - public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content) { + public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) { Result result = new Result(); // if resource upload startup if (!PropertyUtils.getResUploadStartupState()){ @@ -581,15 +824,16 @@ public class ResourcesService extends BaseService { } String name = fileName.trim() + "." + nameSuffix; + String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name); - result = verifyResourceName(name,type,loginUser); + result = verifyResourceName(fullName,type,loginUser); if (!result.getCode().equals(Status.SUCCESS.getCode())) { return result; } // save data Date now = new Date(); - Resource resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now); + Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now); resourcesMapper.insert(resource); @@ -605,7 +849,7 @@ public class ResourcesService extends BaseService { String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode(); - result = uploadContentToHdfs(name, tenantCode, content); + result = uploadContentToHdfs(fullName, tenantCode, content); if (!result.getCode().equals(Status.SUCCESS.getCode())) { throw new RuntimeException(result.getMsg()); } @@ -657,7 +901,7 @@ public class ResourcesService extends BaseService { resourcesMapper.updateById(resource); - result = uploadContentToHdfs(resource.getAlias(), tenantCode, content); + result = uploadContentToHdfs(resource.getFullName(), tenantCode, content); if (!result.getCode().equals(Status.SUCCESS.getCode())) { throw new RuntimeException(result.getMsg()); } @@ -665,10 +909,10 @@ public class ResourcesService extends BaseService { } /** - * @param resourceName - * @param tenantCode - * @param content - * @return + * @param resourceName resource name + * @param tenantCode tenant code + * @param content content + * @return result */ private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) { Result result = new Result(); @@ -684,8 +928,8 @@ public class ResourcesService extends BaseService { return result; } - // get file hdfs path - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName); + // get resource file hdfs path + hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName); String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); logger.info("resource hdfs path is {} ", hdfsFileName); @@ -729,11 +973,14 @@ public class ResourcesService extends BaseService { logger.error("download file not exist, resource id {}", resourceId); return null; } + if (resource.isDirectory()) { + logger.error("resource id {} is directory,can't download it", resourceId); + throw new RuntimeException("cant't download directory"); + } User user = userMapper.queryDetailsById(resource.getUserId()); String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); - String hdfsFileName = ""; - hdfsFileName = getHdfsFileName(resource, tenantCode, hdfsFileName); + String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias()); String localFileName = FileUtils.getDownloadFilename(resource.getAlias()); logger.info("resource hdfs path is {} ", hdfsFileName); @@ -743,6 +990,33 @@ public class ResourcesService extends BaseService { } + /** + * list all file + * + * @param loginUser login user + * @param userId user id + * @return unauthorized result code + */ + public Map authorizeResourceTree(User loginUser, Integer userId) { + + Map result = new HashMap<>(); + if (checkAdmin(loginUser, result)) { + return result; + } + List resourceList = resourcesMapper.queryResourceExceptUserId(userId); + List list ; + if (CollectionUtils.isNotEmpty(resourceList)) { + Visitor visitor = new ResourceTreeVisitor(resourceList); + list = visitor.visit().getChildren(); + }else { + list = new ArrayList<>(0); + } + + result.put(Constants.DATA_LIST, list); + putMsg(result,Status.SUCCESS); + return result; + } + /** * unauthorized file * @@ -757,8 +1031,8 @@ public class ResourcesService extends BaseService { return result; } List resourceList = resourcesMapper.queryResourceExceptUserId(userId); - List list ; - if (CollectionUtils.isNotEmpty(resourceList)) { + List list ; + if (resourceList != null && resourceList.size() > 0) { Set resourceSet = new HashSet<>(resourceList); List authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId); @@ -767,15 +1041,12 @@ public class ResourcesService extends BaseService { }else { list = new ArrayList<>(0); } - - result.put(Constants.DATA_LIST, list); + Visitor visitor = new ResourceTreeVisitor(list); + result.put(Constants.DATA_LIST, visitor.visit().getChildren()); putMsg(result,Status.SUCCESS); return result; } - - - /** * unauthorized udf function * @@ -841,46 +1112,15 @@ public class ResourcesService extends BaseService { return result; } List authedResources = resourcesMapper.queryAuthorizedResourceList(userId); - - result.put(Constants.DATA_LIST, authedResources); + Visitor visitor = new ResourceTreeVisitor(authedResources); + logger.info(JSON.toJSONString(visitor.visit(), SerializerFeature.SortField)); + String jsonTreeStr = JSON.toJSONString(visitor.visit().getChildren(), SerializerFeature.SortField); + logger.info(jsonTreeStr); + result.put(Constants.DATA_LIST, visitor.visit().getChildren()); putMsg(result,Status.SUCCESS); return result; } - /** - * get hdfs file name - * - * @param resource resource - * @param tenantCode tenant code - * @param hdfsFileName hdfs file name - * @return hdfs file name - */ - private String getHdfsFileName(Resource resource, String tenantCode, String hdfsFileName) { - if (resource.getType().equals(ResourceType.FILE)) { - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); - } else if (resource.getType().equals(ResourceType.UDF)) { - hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, resource.getAlias()); - } - return hdfsFileName; - } - - /** - * get hdfs file name - * - * @param resourceType resource type - * @param tenantCode tenant code - * @param hdfsFileName hdfs file name - * @return hdfs file name - */ - private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) { - if (resourceType.equals(ResourceType.FILE)) { - hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName); - } else if (resourceType.equals(ResourceType.UDF)) { - hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName); - } - return hdfsFileName; - } - /** * get authorized resource list * @@ -920,4 +1160,69 @@ public class ResourcesService extends BaseService { return tenant.getTenantCode(); } + /** + * list all children id + * @param resource resource + * @return all children id + */ + List listAllChildren(Resource resource){ + List childList = new ArrayList<>(); + if (resource.getId() != -1) { + childList.add(resource.getId()); + } + + if(resource.isDirectory()){ + listAllChildren(resource.getId(),childList); + } + return childList; + } + + /** + * list all children id + * @param resourceId resource id + * @param childList child list + */ + void listAllChildren(int resourceId,List childList){ + + List children = resourcesMapper.listChildren(resourceId); + for(int chlidId:children){ + childList.add(chlidId); + listAllChildren(chlidId,childList); + } + } + + /** + * get resource process map key is resource id,value is the set of process definition + * @return resource process definition map + */ + private Map> getResourceProcessMap(){ + Map map = new HashMap<>(); + Map> result = new HashMap<>(); + List> list = processDefinitionMapper.listResources(); + if (CollectionUtils.isNotEmpty(list)) { + for (Map tempMap : list) { + + map.put((Integer) tempMap.get("id"), (String)tempMap.get("resource_ids")); + } + } + + for (Map.Entry entry : map.entrySet()) { + Integer mapKey = entry.getKey(); + String[] arr = entry.getValue().split(","); + Set mapValues = Arrays.stream(arr).map(Integer::parseInt).collect(Collectors.toSet()); + for (Integer value : mapValues) { + if (result.containsKey(value)) { + Set set = result.get(value); + set.add(mapKey); + result.put(value, set); + } else { + Set set = new HashSet<>(); + set.add(mapKey); + result.put(value, set); + } + } + } + return result; + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java index 249c7ec8df..8a0bf748bb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java @@ -118,7 +118,7 @@ public class UdfFuncService extends BaseService{ } udf.setDescription(desc); udf.setResourceId(resourceId); - udf.setResourceName(resource.getAlias()); + udf.setResourceName(resource.getFullName()); udf.setType(type); udf.setCreateTime(now); @@ -226,7 +226,7 @@ public class UdfFuncService extends BaseService{ } udf.setDescription(desc); udf.setResourceId(resourceId); - udf.setResourceName(resource.getAlias()); + udf.setResourceName(resource.getFullName()); udf.setType(type); udf.setUpdateTime(now); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java new file mode 100644 index 0000000000..8a4a16c4f0 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto.resources.filter; + +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * resource filter test + */ +public class ResourceFilterTest { + private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class); + @Test + public void filterTest(){ + List allList = new ArrayList<>(); + + Resource resource1 = new Resource(3,-1,"b","/b",true); + Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false); + Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false); + Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false); + Resource resource5 = new Resource(7,-1,"b2","/b2",true); + Resource resource6 = new Resource(8,-1,"b2","/b/b2",true); + Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false); + allList.add(resource1); + allList.add(resource2); + allList.add(resource3); + allList.add(resource4); + allList.add(resource5); + allList.add(resource6); + allList.add(resource7); + + + ResourceFilter resourceFilter = new ResourceFilter(".jar",allList); + List resourceList = resourceFilter.filter(); + Assert.assertNotNull(resourceList); + resourceList.stream().forEach(t-> logger.info(t.toString())); + } +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java new file mode 100644 index 0000000000..d1f8a12012 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto.resources.visitor; + +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * resource tree visitor test + */ +public class ResourceTreeVisitorTest { + + @Test + public void visit() throws Exception { + List resourceList = new ArrayList<>(); + + Resource resource1 = new Resource(3,-1,"b","/b",true); + Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false); + Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false); + Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false); + Resource resource5 = new Resource(7,-1,"b2","/b2",true); + Resource resource6 = new Resource(8,-1,"b2","/b/b2",true); + Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false); + resourceList.add(resource1); + resourceList.add(resource2); + resourceList.add(resource3); + resourceList.add(resource4); + resourceList.add(resource5); + resourceList.add(resource6); + resourceList.add(resource7); + + ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList); + ResourceComponent resourceComponent = resourceTreeVisitor.visit(); + Assert.assertNotNull(resourceComponent.getChildren()); + } + + @Test + public void rootNode() throws Exception { + List resourceList = new ArrayList<>(); + + Resource resource1 = new Resource(3,-1,"b","/b",true); + Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false); + Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false); + Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false); + Resource resource5 = new Resource(7,-1,"b2","/b2",true); + Resource resource6 = new Resource(8,-1,"b2","/b/b2",true); + Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false); + resourceList.add(resource1); + resourceList.add(resource2); + resourceList.add(resource3); + resourceList.add(resource4); + resourceList.add(resource5); + resourceList.add(resource6); + resourceList.add(resource7); + + ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList); + Assert.assertTrue(resourceTreeVisitor.rootNode(resource1)); + Assert.assertTrue(resourceTreeVisitor.rootNode(resource2)); + Assert.assertFalse(resourceTreeVisitor.rootNode(resource3)); + + } + +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 6d07ebd99c..d73eba8bdc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -24,10 +24,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; @@ -40,6 +37,7 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; +import org.omg.CORBA.Any; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -73,6 +71,8 @@ public class ResourcesServiceTest { private UserMapper userMapper; @Mock private UdfFuncMapper udfFunctionMapper; + @Mock + private ProcessDefinitionMapper processDefinitionMapper; @Before public void setUp() { @@ -96,14 +96,14 @@ public class ResourcesServiceTest { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); User user = new User(); //HDFS_NOT_STARTUP - Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); + Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null,-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //RESOURCE_FILE_IS_EMPTY MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf",new String().getBytes()); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); - result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile); + result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(),result.getMsg()); @@ -111,31 +111,42 @@ public class ResourcesServiceTest { mockMultipartFile = new MockMultipartFile("test.pdf","test.pdf","pdf",new String("test").getBytes()); PowerMockito.when(FileUtils.suffix("test.pdf")).thenReturn("pdf"); PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.jar")).thenReturn("jar"); - result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile); + result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_SUFFIX_FORBID_CHANGE.getMsg(),result.getMsg()); //UDF_RESOURCE_SUFFIX_NOT_JAR mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.pdf","ResourcesServiceTest.pdf","pdf",new String("test").getBytes()); PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.pdf")).thenReturn("pdf"); - result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile); + result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile,-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg(),result.getMsg()); - //UDF_RESOURCE_SUFFIX_NOT_JAR - Mockito.when(tenantMapper.queryById(0)).thenReturn(getTenant()); - Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(getResourceList()); - mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.jar","ResourcesServiceTest.jar","pdf",new String("test").getBytes()); - result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile); + } + + @Test + public void testCreateDirecotry(){ + + PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); + User user = new User(); + //HDFS_NOT_STARTUP + Result result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/"); logger.info(result.toString()); - Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); + Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); - //SUCCESS - Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(new ArrayList<>()); - result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile); + //PARENT_RESOURCE_NOT_EXIST + PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); + Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null); + result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/"); logger.info(result.toString()); - Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); + Assert.assertEquals(Status.PARENT_RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); + //RESOURCE_EXIST + PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); + Mockito.when(resourcesMapper.queryResourceList("/directoryTest", 0, 0)).thenReturn(getResourceList()); + result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/"); + logger.info(result.toString()); + Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); } @@ -163,41 +174,46 @@ public class ResourcesServiceTest { //SUCCESS user.setId(1); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest.jar",ResourceType.FILE); + Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser()); + Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); + + result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); //RESOURCE_EXIST - Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList()); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.FILE); + Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList()); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); //USER_NOT_EXIST - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(null); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); logger.info(result.toString()); Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode()); //TENANT_NOT_EXIST Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser()); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg()); //RESOURCE_NOT_EXIST Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); - PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test1"); + PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test1"); try { Mockito.when(hadoopUtils.exists("test")).thenReturn(true); } catch (IOException e) { e.printStackTrace(); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //SUCCESS - PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test"); + PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test"); result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); @@ -212,8 +228,8 @@ public class ResourcesServiceTest { resourcePage.setTotal(1); resourcePage.setRecords(getResourceList()); Mockito.when(resourcesMapper.queryResourcePaging(Mockito.any(Page.class), - Mockito.eq(0), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage); - Map result = resourcesService.queryResourceListPaging(loginUser,ResourceType.FILE,"test",1,10); + Mockito.eq(0),Mockito.eq(-1), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage); + Map result = resourcesService.queryResourceListPaging(loginUser,-1,ResourceType.FILE,"test",1,10); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); @@ -263,6 +279,7 @@ public class ResourcesServiceTest { //TENANT_NOT_EXIST loginUser.setUserType(UserType.ADMIN_USER); loginUser.setTenantId(2); + Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(loginUser); result = resourcesService.delete(loginUser,1); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg()); @@ -285,14 +302,20 @@ public class ResourcesServiceTest { User user = new User(); user.setId(1); - Mockito.when(resourcesMapper.queryResourceList("test", 0, 0)).thenReturn(getResourceList()); - Result result = resourcesService.verifyResourceName("test",ResourceType.FILE,user); + Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest.jar", 0, 0)).thenReturn(getResourceList()); + Result result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg()); //TENANT_NOT_EXIST Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); - result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user); + String unExistFullName = "/test.jar"; + try { + Mockito.when(hadoopUtils.exists(unExistFullName)).thenReturn(false); + } catch (IOException e) { + logger.error("hadoop error",e); + } + result = resourcesService.verifyResourceName("/test.jar",ResourceType.FILE,user); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg()); @@ -304,10 +327,10 @@ public class ResourcesServiceTest { } catch (IOException e) { logger.error("hadoop error",e); } - PowerMockito.when(HadoopUtils.getHdfsFilename("123", "test1")).thenReturn("test"); - result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user); + PowerMockito.when(HadoopUtils.getHdfsResourceFileName("123", "test1")).thenReturn("test"); + result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user); logger.info(result.toString()); - Assert.assertTrue(Status.RESOURCE_FILE_EXIST.getCode()==result.getCode()); + Assert.assertTrue(Status.RESOURCE_EXIST.getCode()==result.getCode()); //SUCCESS result = resourcesService.verifyResourceName("test2",ResourceType.FILE,user); @@ -389,14 +412,14 @@ public class ResourcesServiceTest { PowerMockito.when(HadoopUtils.getHdfsUdfDir("udfDir")).thenReturn("udfDir"); User user = getUser(); //HDFS_NOT_STARTUP - Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content"); + Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //RESOURCE_SUFFIX_NOT_SUPPORT_VIEW PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("class"); - result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content"); + result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(),result.getMsg()); @@ -404,7 +427,7 @@ public class ResourcesServiceTest { try { PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("jar"); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); - result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content"); + result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content",-1,"/"); }catch (RuntimeException ex){ logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), ex.getMessage()); @@ -413,7 +436,7 @@ public class ResourcesServiceTest { //SUCCESS Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test"); PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true); - result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content"); + result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/"); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); @@ -584,13 +607,26 @@ public class ResourcesServiceTest { private Resource getResource(){ Resource resource = new Resource(); + resource.setPid(-1); resource.setUserId(1); resource.setDescription("ResourcesServiceTest.jar"); resource.setAlias("ResourcesServiceTest.jar"); + resource.setFullName("/ResourcesServiceTest.jar"); resource.setType(ResourceType.FILE); return resource; } + private Resource getUdfResource(){ + + Resource resource = new Resource(); + resource.setUserId(1); + resource.setDescription("udfTest"); + resource.setAlias("udfTest.jar"); + resource.setFullName("/udfTest.jar"); + resource.setType(ResourceType.UDF); + return resource; + } + private UdfFunc getUdfFunc(){ UdfFunc udfFunc = new UdfFunc(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index 308ed8e9b6..ccc231fcf6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -43,6 +43,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Map; import static org.junit.Assert.*; @@ -173,7 +174,11 @@ public class CheckUtilsTest { // MapreduceParameters MapreduceParameters mapreduceParameters = new MapreduceParameters(); assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); - mapreduceParameters.setMainJar(new ResourceInfo()); + + ResourceInfo resourceInfoMapreduce = new ResourceInfo(); + resourceInfoMapreduce.setId(1); + resourceInfoMapreduce.setRes(""); + mapreduceParameters.setMainJar(resourceInfoMapreduce); mapreduceParameters.setProgramType(ProgramType.JAVA); assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java index 1c371e799e..633f5f9623 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java @@ -23,13 +23,17 @@ import com.baomidou.mybatisplus.annotation.EnumValue; */ public enum AuthorizationType { /** - * 0 RESOURCE_FILE; + * 0 RESOURCE_FILE_ID; + * 0 RESOURCE_FILE_NAME; + * 1 UDF_FILE; * 1 DATASOURCE; * 2 UDF; */ - RESOURCE_FILE(0, "resource file"), - DATASOURCE(1, "data source"), - UDF(2, "udf function"); + RESOURCE_FILE_ID(0, "resource file id"), + RESOURCE_FILE_NAME(1, "resource file name"), + UDF_FILE(2, "udf file"), + DATASOURCE(3, "data source"), + UDF(4, "udf function"); AuthorizationType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java index 3c95ac648b..a7fc0839eb 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java @@ -23,6 +23,16 @@ public class ResourceInfo { /** * res the name of the resource that was uploaded */ + private int id; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + private String res; public String getRes() { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java index 2d0322a6d7..ae78caf881 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.common.task; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import java.util.LinkedHashMap; import java.util.List; @@ -31,7 +32,7 @@ public abstract class AbstractParameters implements IParameters { public abstract boolean checkParameters(); @Override - public abstract List getResourceFilesList(); + public abstract List getResourceFilesList(); /** * local parameters diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java index 8fb49eb1fa..88a2b54761 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.task; +import org.apache.dolphinscheduler.common.process.ResourceInfo; + import java.util.List; /** @@ -34,5 +36,5 @@ public interface IParameters { * * @return resource files list */ - List getResourceFilesList(); + List getResourceFilesList(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java index 5714b5ef3e..7f0f2c8079 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.conditions; import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import java.util.List; @@ -41,7 +42,7 @@ public class ConditionsParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return null; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java index f153360d63..872b3aa174 100755 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; /** @@ -198,7 +199,7 @@ public class DataxParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java index 9ff1405722..5f2e0e1853 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.dependent; import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import java.util.ArrayList; @@ -36,7 +37,7 @@ public class DependentParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 1fbd9ab354..05cbb1d794 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.flink; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * spark parameters @@ -50,35 +50,35 @@ public class FlinkParameters extends AbstractParameters { private String mainArgs; /** - * slot个数 + * slot count */ private int slot; /** - *Yarn application的名字 + *Yarn application name */ private String appName; /** - * taskManager 数量 + * taskManager count */ private int taskManager; /** - * jobManagerMemory 内存大小 + * job manager memory */ private String jobManagerMemory ; /** - * taskManagerMemory内存大小 + * task manager memory */ private String taskManagerMemory; /** * resource list */ - private List resourceList; + private List resourceList = new ArrayList<>(); /** * The YARN queue to submit to @@ -207,16 +207,11 @@ public class FlinkParameters extends AbstractParameters { @Override - public List getResourceFilesList() { - if(resourceList != null ) { - List resourceFiles = resourceList.stream() - .map(ResourceInfo::getRes).collect(Collectors.toList()); - if(mainJar != null) { - resourceFiles.add(mainJar.getRes()); - } - return resourceFiles; + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); } - return Collections.emptyList(); + return resourceList; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java index 00b01afce3..54284bd8b0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.task.http; import org.apache.dolphinscheduler.common.enums.HttpCheckCondition; import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.process.HttpProperty; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.commons.lang.StringUtils; @@ -62,7 +63,7 @@ public class HttpParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java index 31c9c7292f..5126e82e85 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java @@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.mr; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class MapreduceParameters extends AbstractParameters { @@ -54,7 +54,7 @@ public class MapreduceParameters extends AbstractParameters { /** * resource list */ - private List resourceList; + private List resourceList = new ArrayList<>(); /** * program type @@ -125,16 +125,12 @@ public class MapreduceParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { - if(resourceList != null ) { - List resourceFiles = resourceList.stream() - .map(ResourceInfo::getRes).collect(Collectors.toList()); - if(mainJar != null) { - resourceFiles.add(mainJar.getRes()); - } - return resourceFiles; + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); } - return Collections.emptyList(); + + return resourceList; } @Override diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java index 56ae65547d..2811f10380 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.task.procedure; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.commons.lang.StringUtils; @@ -74,7 +75,7 @@ public class ProcedureParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java index ae9cb4c7da..35dbd8ed86 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import java.util.List; -import java.util.stream.Collectors; public class PythonParameters extends AbstractParameters { /** @@ -56,12 +55,7 @@ public class PythonParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { - if (resourceList != null) { - return resourceList.stream() - .map(p -> p.getRes()).collect(Collectors.toList()); - } - - return null; + public List getResourceFilesList() { + return this.resourceList; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java index 85b8acb46a..e11e59600b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java @@ -59,12 +59,7 @@ public class ShellParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { - if (resourceList != null) { - return resourceList.stream() - .map(p -> p.getRes()).collect(Collectors.toList()); - } - - return null; + public List getResourceFilesList() { + return resourceList; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 74982d5af9..4e58201bf3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.spark; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * spark parameters @@ -78,7 +78,7 @@ public class SparkParameters extends AbstractParameters { /** * resource list */ - private List resourceList; + private List resourceList = new ArrayList<>(); /** * The YARN queue to submit to @@ -219,18 +219,12 @@ public class SparkParameters extends AbstractParameters { return mainJar != null && programType != null && sparkVersion != null; } - @Override - public List getResourceFilesList() { - if(resourceList !=null ) { - List resourceFilesList = resourceList.stream() - .map(ResourceInfo::getRes).collect(Collectors.toList()); - if(mainJar != null){ - resourceFilesList.add(mainJar.getRes()); - } - return resourceFilesList; + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); } - return Collections.emptyList(); + return resourceList; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java index d65204a386..4604234e8f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.task.sql; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.commons.lang.StringUtils; @@ -189,7 +190,7 @@ public class SqlParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java index fb65df6c1b..7f02f42387 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.task.sqoop; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -111,7 +112,7 @@ public class SqoopParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java index c7784de8dd..46f0e8510c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.dolphinscheduler.common.task.subprocess; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import java.util.ArrayList; @@ -42,7 +43,7 @@ public class SubProcessParameters extends AbstractParameters { } @Override - public List getResourceFilesList() { + public List getResourceFilesList() { return new ArrayList<>(); } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 6c42704b47..c89f3c835c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -26,6 +26,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import org.apache.commons.io.IOUtils; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; @@ -415,6 +416,22 @@ public class HadoopUtils implements Closeable { } } + /** + * hdfs resource dir + * + * @param tenantCode tenant code + * @return hdfs resource dir + */ + public static String getHdfsDir(ResourceType resourceType,String tenantCode) { + String hdfsDir = ""; + if (resourceType.equals(ResourceType.FILE)) { + hdfsDir = getHdfsResDir(tenantCode); + } else if (resourceType.equals(ResourceType.UDF)) { + hdfsDir = getHdfsUdfDir(tenantCode); + } + return hdfsDir; + } + /** * hdfs resource dir * @@ -450,22 +467,42 @@ public class HadoopUtils implements Closeable { * get absolute path and name for file on hdfs * * @param tenantCode tenant code - * @param filename file name + * @param fileName file name + * @return get absolute path and name for file on hdfs + */ + + /** + * get hdfs file name + * + * @param resourceType resource type + * @param tenantCode tenant code + * @param fileName file name + * @return hdfs file name + */ + public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName); + } + + /** + * get absolute path and name for resource file on hdfs + * + * @param tenantCode tenant code + * @param fileName file name * @return get absolute path and name for file on hdfs */ - public static String getHdfsFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsResDir(tenantCode), filename); + public static String getHdfsResourceFileName(String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsResDir(tenantCode), fileName); } /** * get absolute path and name for udf file on hdfs * * @param tenantCode tenant code - * @param filename file name + * @param fileName file name * @return get absolute path and name for udf file on hdfs */ - public static String getHdfsUdfFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename); + public static String getHdfsUdfFileName(String tenantCode, String fileName) { + return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName); } /** diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java index 7ce00e875a..cd7b4f2200 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.junit.Assert; import org.junit.Test; @@ -28,8 +29,7 @@ public class FlinkParametersTest { @Test public void getResourceFilesList() { FlinkParameters flinkParameters = new FlinkParameters(); - Assert.assertNotNull(flinkParameters.getResourceFilesList()); - Assert.assertTrue(flinkParameters.getResourceFilesList().isEmpty()); + Assert.assertTrue(CollectionUtils.isEmpty(flinkParameters.getResourceFilesList())); ResourceInfo mainResource = new ResourceInfo(); mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar"); @@ -41,15 +41,17 @@ public class FlinkParametersTest { resourceInfos.add(resourceInfo1); flinkParameters.setResourceList(resourceInfos); - Assert.assertNotNull(flinkParameters.getResourceFilesList()); - Assert.assertEquals(2, flinkParameters.getResourceFilesList().size()); + List resourceFilesList = flinkParameters.getResourceFilesList(); + Assert.assertNotNull(resourceFilesList); + Assert.assertEquals(2, resourceFilesList.size()); ResourceInfo resourceInfo2 = new ResourceInfo(); resourceInfo2.setRes("testFlinkParameters2.jar"); resourceInfos.add(resourceInfo2); flinkParameters.setResourceList(resourceInfos); - Assert.assertNotNull(flinkParameters.getResourceFilesList()); - Assert.assertEquals(3, flinkParameters.getResourceFilesList().size()); + resourceFilesList = flinkParameters.getResourceFilesList(); + Assert.assertNotNull(resourceFilesList); + Assert.assertEquals(3, resourceFilesList.size()); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index f59d11f3fe..e29de897ef 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -163,6 +163,11 @@ public class ProcessDefinition { */ private String modifyBy; + /** + * resource ids + */ + private String resourceIds; + public String getName() { return name; @@ -334,6 +339,14 @@ public class ProcessDefinition { this.scheduleReleaseState = scheduleReleaseState; } + public String getResourceIds() { + return resourceIds; + } + + public void setResourceIds(String resourceIds) { + this.resourceIds = resourceIds; + } + public int getTimeout() { return timeout; } @@ -393,6 +406,8 @@ public class ProcessDefinition { ", timeout=" + timeout + ", tenantId=" + tenantId + ", modifyBy='" + modifyBy + '\'' + + ", resourceIds='" + resourceIds + '\'' + '}'; } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java index 934be4ba3d..16d94914fd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java @@ -32,11 +32,26 @@ public class Resource { @TableId(value="id", type=IdType.AUTO) private int id; + /** + * parent id + */ + private int pid; + /** * resource alias */ private String alias; + /** + * full name + */ + private String fullName; + + /** + * is directory + */ + private boolean isDirectory=false; + /** * description */ @@ -89,7 +104,15 @@ public class Resource { this.updateTime = updateTime; } - public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) { + public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) { + this.id = id; + this.pid = pid; + this.alias = alias; + this.fullName = fullName; + this.isDirectory = isDirectory; + } + + /*public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) { this.alias = alias; this.fileName = fileName; this.description = description; @@ -98,6 +121,20 @@ public class Resource { this.size = size; this.createTime = createTime; this.updateTime = updateTime; + }*/ + + public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) { + this.pid = pid; + this.alias = alias; + this.fullName = fullName; + this.isDirectory = isDirectory; + this.description = description; + this.fileName = fileName; + this.userId = userId; + this.type = type; + this.size = size; + this.createTime = createTime; + this.updateTime = updateTime; } public int getId() { @@ -116,6 +153,30 @@ public class Resource { this.alias = alias; } + public int getPid() { + return pid; + } + + public void setPid(int pid) { + this.pid = pid; + } + + public String getFullName() { + return fullName; + } + + public void setFullName(String fullName) { + this.fullName = fullName; + } + + public boolean isDirectory() { + return isDirectory; + } + + public void setDirectory(boolean directory) { + isDirectory = directory; + } + public String getFileName() { return fileName; } @@ -177,9 +238,12 @@ public class Resource { public String toString() { return "Resource{" + "id=" + id + + ", pid=" + pid + ", alias='" + alias + '\'' + - ", fileName='" + fileName + '\'' + + ", fullName='" + fullName + '\'' + + ", isDirectory=" + isDirectory + ", description='" + description + '\'' + + ", fileName='" + fileName + '\'' + ", userId=" + userId + ", type=" + type + ", size=" + size + diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index 9f9225cb04..b75bb58b7d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -20,9 +20,11 @@ import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.metadata.IPage; +import org.apache.ibatis.annotations.MapKey; import org.apache.ibatis.annotations.Param; import java.util.List; +import java.util.Map; /** * process definition mapper interface @@ -83,7 +85,7 @@ public interface ProcessDefinitionMapper extends BaseMapper { List queryDefinitionListByTenant(@Param("tenantId") int tenantId); /** - * count process definition group by user + * count process definition group by user * @param userId userId * @param projectIds projectIds * @param isAdmin isAdmin @@ -93,4 +95,11 @@ public interface ProcessDefinitionMapper extends BaseMapper { @Param("userId") Integer userId, @Param("projectIds") Integer[] projectIds, @Param("isAdmin") boolean isAdmin); + + /** + * list all resource ids + * @return resource ids list + */ + @MapKey("id") + List> listResources(); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java index cf65e5d08a..f07a92c0a2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java @@ -30,12 +30,12 @@ public interface ResourceMapper extends BaseMapper { /** * query resource list - * @param alias alias + * @param fullName full name * @param userId userId * @param type type * @return resource list */ - List queryResourceList(@Param("alias") String alias, + List queryResourceList(@Param("fullName") String fullName, @Param("userId") int userId, @Param("type") int type); @@ -59,6 +59,7 @@ public interface ResourceMapper extends BaseMapper { */ IPage queryResourcePaging(IPage page, @Param("userId") int userId, + @Param("id") int id, @Param("type") int type, @Param("searchVal") String searchVal); @@ -76,13 +77,13 @@ public interface ResourceMapper extends BaseMapper { */ List queryResourceExceptUserId(@Param("userId") int userId); - /** * query tenant code by name * @param resName resource name + * @param resType resource type * @return tenant code */ - String queryTenantCodeByResourceName(@Param("resName") String resName); + String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType); /** * list authorized resource @@ -91,4 +92,48 @@ public interface ResourceMapper extends BaseMapper { * @return resource list */ List listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames); + + /** + * list authorized resource + * @param userId userId + * @param resIds resource ids + * @return resource list + */ + List listAuthorizedResourceById(@Param("userId") int userId,@Param("resIds")T[] resIds); + + /** + * delete resource by id array + * @param resIds resource id array + * @return delete num + */ + int deleteIds(@Param("resIds")Integer[] resIds); + + /** + * list children + * @param direcotyId directory id + * @return resource id array + */ + List listChildren(@Param("direcotyId") int direcotyId); + + /** + * query resource by full name or pid + * @param fullName full name + * @param type resource type + * @return resource + */ + List queryResource(@Param("fullName") String fullName,@Param("type") int type); + + /** + * list resource by id array + * @param resIds resource id array + * @return resource list + */ + List listResourceByIds(@Param("resIds")Integer[] resIds); + + /** + * update resource + * @param resourceList resource list + * @return update num + */ + int batchUpdateResource(@Param("resourceList") List resourceList); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java index 5a8734233c..2411c9b178 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java @@ -86,4 +86,19 @@ public interface UdfFuncMapper extends BaseMapper { */ List listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds); + /** + * list UDF by resource id + * @param resourceIds resource id array + * @return UDF function list + */ + List listUdfByResourceId(@Param("resourceIds") int[] resourceIds); + + /** + * list authorized UDF by resource id + * @param resourceIds resource id array + * @return UDF function list + */ + List listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds); + + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index f2157783e8..c9086b9f83 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -87,4 +87,11 @@ pd.user_id = u.id AND pd.project_id = p.id AND pd.id = #{processDefineId} + + + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml index 2146d1ac20..c1fe50fd47 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml @@ -22,8 +22,8 @@ select * from t_ds_resources where 1= 1 - - and alias = #{alias} + + and full_name = #{fullName} and type = #{type} @@ -47,8 +47,8 @@ select tenant_code from t_ds_tenant t, t_ds_user u, t_ds_resources res - where t.id = u.tenant_id and u.id = res.user_id and res.type=0 - and res.alias= #{resName} + where t.id = u.tenant_id and u.id = res.user_id and res.type=#{resType} + and res.full_name= #{resName} + + + + + + delete from t_ds_resources where id in + + #{i} + + + + + + + + + + update t_ds_resources + + full_name=#{resource.fullName}, + update_time=#{resource.updateTime} + + + id=#{resource.id} + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml index 0aa10607c4..e38d1637d6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml @@ -87,4 +87,28 @@ + + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index 01082414a9..6a2aea5ad2 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -34,6 +34,7 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -68,7 +69,10 @@ public class ResourceMapperTest { private Resource insertOne(){ //insertOne Resource resource = new Resource(); - resource.setAlias("ut resource"); + resource.setAlias("ut-resource"); + resource.setFullName("/ut-resource"); + resource.setPid(-1); + resource.setDirectory(false); resource.setType(ResourceType.FILE); resource.setUserId(111); resourceMapper.insert(resource); @@ -80,16 +84,32 @@ public class ResourceMapperTest { * @param user user * @return Resource */ - private Resource createResource(User user){ + private Resource createResource(User user,boolean isDirectory,ResourceType resourceType,int pid,String alias,String fullName){ //insertOne Resource resource = new Resource(); - resource.setAlias(String.format("ut resource %s",user.getUserName())); - resource.setType(ResourceType.FILE); + resource.setDirectory(isDirectory); + resource.setType(resourceType); + resource.setAlias(alias); + resource.setFullName(fullName); resource.setUserId(user.getId()); resourceMapper.insert(resource); return resource; } + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createResource(User user){ + //insertOne + String alias = String.format("ut-resource-%s",user.getUserName()); + String fullName = String.format("/%s",alias); + + Resource resource = createResource(user, false, ResourceType.FILE, -1, alias, fullName); + return resource; + } + /** * create user * @return User @@ -200,13 +220,15 @@ public class ResourceMapperTest { IPage resourceIPage = resourceMapper.queryResourcePaging( page, - resource.getUserId(), + 0, + -1, resource.getType().ordinal(), "" ); IPage resourceIPage1 = resourceMapper.queryResourcePaging( page, 1110, + -1, resource.getType().ordinal(), "" ); @@ -289,7 +311,7 @@ public class ResourceMapperTest { resourceMapper.updateById(resource); String resource1 = resourceMapper.queryTenantCodeByResourceName( - resource.getAlias() + resource.getFullName(),ResourceType.FILE.ordinal() ); @@ -305,22 +327,37 @@ public class ResourceMapperTest { User generalUser2 = createGeneralUser("user2"); // create one resource Resource resource = createResource(generalUser2); - Resource unauthorizedResource = createResource(generalUser2); + Resource unauthorizedResource = createResource(generalUser1); // need download resources - String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()}; + String[] resNames = new String[]{resource.getFullName(), unauthorizedResource.getFullName()}; List resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); Assert.assertEquals(generalUser2.getId(),resource.getUserId()); - Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + Assert.assertFalse(resources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames))); // authorize object unauthorizedResource to generalUser createResourcesUser(unauthorizedResource,generalUser2); List authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); - Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + Assert.assertTrue(authorizedResources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames))); + + } + + @Test + public void deleteIdsTest(){ + // create a general user + User generalUser1 = createGeneralUser("user1"); + + Resource resource = createResource(generalUser1); + Resource resource1 = createResource(generalUser1); + List resourceList = new ArrayList<>(); + resourceList.add(resource.getId()); + resourceList.add(resource1.getId()); + int result = resourceMapper.deleteIds(resourceList.toArray(new Integer[resourceList.size()])); + Assert.assertEquals(result,2); } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 48048e7eba..c7806f1b66 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -23,15 +23,18 @@ import com.alibaba.fastjson.JSON; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; @@ -96,7 +99,7 @@ public class TaskScheduleThread implements Runnable { TaskNode taskNode = JSON.parseObject(taskInstance.getTaskJson(), TaskNode.class); // get resource files - List resourceFiles = createProjectResFiles(taskNode); + List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource( taskInstance.getExecutePath(), @@ -165,6 +168,7 @@ public class TaskScheduleThread implements Runnable { new Date(), taskInstance.getId()); } + /** * get global paras map * @return @@ -289,14 +293,16 @@ public class TaskScheduleThread implements Runnable { /** * create project resource files */ - private List createProjectResFiles(TaskNode taskNode) throws Exception{ + private List createProjectResFiles(TaskNode taskNode) throws Exception{ - Set projectFiles = new HashSet<>(); + Set projectFiles = new HashSet<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { - List projectResourceFiles = baseParam.getResourceFilesList(); - projectFiles.addAll(projectResourceFiles); + List projectResourceFiles = baseParam.getResourceFilesList(); + if (projectResourceFiles != null) { + projectFiles.addAll(projectResourceFiles); + } } return new ArrayList<>(projectFiles); @@ -309,18 +315,25 @@ public class TaskScheduleThread implements Runnable { * @param projectRes * @param logger */ - private void downloadResource(String execLocalPath, List projectRes, Logger logger) throws Exception { + private void downloadResource(String execLocalPath, List projectRes, Logger logger) throws Exception { checkDownloadPermission(projectRes); - for (String res : projectRes) { - File resFile = new File(execLocalPath, res); + String resourceName; + for (ResourceInfo res : projectRes) { + if (res.getId() != 0) { + Resource resource = processService.getResourceById(res.getId()); + resourceName = resource.getFullName(); + }else{ + resourceName = res.getRes(); + } + File resFile = new File(execLocalPath, resourceName); if (!resFile.exists()) { try { // query the tenant code of the resource according to the name of the resource - String tentnCode = processService.queryTenantCodeByResName(res); - String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res); + String tentnCode = processService.queryTenantCodeByResName(resourceName, ResourceType.FILE); + String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, resourceName); logger.info("get resource file from hdfs :{}", resHdfsPath); - HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true); + HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resourceName, false, true); }catch (Exception e){ logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); @@ -336,10 +349,17 @@ public class TaskScheduleThread implements Runnable { * @param projectRes resource name list * @throws Exception exception */ - private void checkDownloadPermission(List projectRes) throws Exception { + private void checkDownloadPermission(List projectRes) throws Exception { + int userId = taskInstance.getProcessInstance().getExecutorId(); - String[] resNames = projectRes.toArray(new String[projectRes.size()]); - PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); - permissionCheck.checkPermission(); + if (projectRes.stream().allMatch(t->t.getId() == 0)) { + String[] resNames = projectRes.stream().map(t -> t.getRes()).collect(Collectors.toList()).toArray(new String[projectRes.size()]); + PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_NAME,processService,resNames,userId,logger); + permissionCheck.checkPermission(); + }else{ + Integer[] resIds = projectRes.stream().map(t -> t.getId()).collect(Collectors.toList()).toArray(new Integer[projectRes.size()]); + PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resIds,userId,logger); + permissionCheck.checkPermission(); + } } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 39f4dfbb97..cda12ca525 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -94,4 +94,9 @@ public abstract class AbstractYarnTask extends AbstractTask { * @throws Exception exception */ protected abstract String buildCommand() throws Exception; + + /** + * set main jar name + */ + protected abstract void setMainJarName(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index c562fbe4dd..0dc7c6a638 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -17,12 +17,14 @@ package org.apache.dolphinscheduler.server.worker.task.flink; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; @@ -63,6 +65,7 @@ public class FlinkTask extends AbstractYarnTask { if (!flinkParameters.checkParameters()) { throw new RuntimeException("flink task params is not valid"); } + setMainJarName(); flinkParameters.setQueue(taskProps.getQueue()); if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { @@ -111,6 +114,28 @@ public class FlinkTask extends AbstractYarnTask { return command; } + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = flinkParameters.getMainJar(); + if (mainJar != null) { + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + flinkParameters.setMainJar(mainJar); + } + } + @Override public AbstractParameters getParameters() { return flinkParameters; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 7f6baad427..0909fbd06e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.server.worker.task.mr; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -64,7 +66,7 @@ public class MapReduceTask extends AbstractYarnTask { if (!mapreduceParameters.checkParameters()) { throw new RuntimeException("mapreduce task params is not valid"); } - + setMainJarName(); mapreduceParameters.setQueue(taskProps.getQueue()); // replace placeholder @@ -99,6 +101,28 @@ public class MapReduceTask extends AbstractYarnTask { return command; } + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = mapreduceParameters.getMainJar(); + if (mainJar != null) { + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(mapreduceParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + mapreduceParameters.setMainJar(mainJar); + } + } + @Override public AbstractParameters getParameters() { return mapreduceParameters; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 203c0fe146..d2a8674146 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -18,11 +18,13 @@ package org.apache.dolphinscheduler.server.worker.task.spark; import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; @@ -67,8 +69,8 @@ public class SparkTask extends AbstractYarnTask { if (!sparkParameters.checkParameters()) { throw new RuntimeException("spark task params is not valid"); } + setMainJarName(); sparkParameters.setQueue(taskProps.getQueue()); - if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { String args = sparkParameters.getMainArgs(); @@ -115,6 +117,28 @@ public class SparkTask extends AbstractYarnTask { return command; } + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = sparkParameters.getMainJar(); + if (mainJar != null) { + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + sparkParameters.setMainJar(mainJar); + } + } + @Override public AbstractParameters getParameters() { return sparkParameters; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 64bc7924d2..5c701dcd52 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -71,6 +71,10 @@ public class SqoopTask extends AbstractYarnTask { return null; } + @Override + protected void setMainJarName() { + } + @Override public AbstractParameters getParameters() { return sqoopParameters; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java index e53fae6e86..9f93f4ce3e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.permission; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -45,6 +46,11 @@ public class PermissionCheck { */ private T[] needChecks; + /** + * resoruce info + */ + private List resourceList; + /** * user id */ @@ -90,6 +96,22 @@ public class PermissionCheck { this.logger = logger; } + /** + * permission check + * @param logger + * @param authorizationType + * @param processService + * @param resourceList + * @param userId + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List resourceList, int userId,Logger logger) { + this.authorizationType = authorizationType; + this.processService = processService; + this.resourceList = resourceList; + this.userId = userId; + this.logger = logger; + } + public AuthorizationType getAuthorizationType() { return authorizationType; } @@ -122,6 +144,14 @@ public class PermissionCheck { this.userId = userId; } + public List getResourceList() { + return resourceList; + } + + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + /** * has permission * @return true if has permission @@ -141,6 +171,7 @@ public class PermissionCheck { */ public void checkPermission() throws Exception{ if(this.needChecks.length > 0){ + // get user type in order to judge whether the user is admin User user = processService.getUserById(userId); if (user.getUserType() != UserType.ADMIN_USER){ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c848ec5197..3312c1004a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1556,10 +1556,11 @@ public class ProcessService { /** * find tenant code by resource name * @param resName resource name + * @param resourceType resource type * @return tenant code */ - public String queryTenantCodeByResName(String resName){ - return resourceMapper.queryTenantCodeByResourceName(resName); + public String queryTenantCodeByResName(String resName,ResourceType resourceType){ + return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal()); } /** @@ -1791,10 +1792,18 @@ public class ProcessService { Set originResSet = new HashSet(Arrays.asList(needChecks)); switch (authorizationType){ - case RESOURCE_FILE: - Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet()); + case RESOURCE_FILE_ID: + Set authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedResourceFiles); + break; + case RESOURCE_FILE_NAME: + Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet()); originResSet.removeAll(authorizedResources); break; + case UDF_FILE: + Set authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedUdfFiles); + break; case DATASOURCE: Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet()); originResSet.removeAll(authorizedDatasources); @@ -1820,5 +1829,14 @@ public class ProcessService { return userMapper.queryDetailsById(userId); } + /** + * get resource by resoruce id + * @param resoruceId resource id + * @return Resource + */ + public Resource getResourceById(int resoruceId){ + return resourceMapper.selectById(resoruceId); + } + } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js index 9412a8cf38..6a17239e65 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js @@ -724,7 +724,7 @@ JSP.prototype.handleEvent = function () { } else { $(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1) } - + // Storage node dependency information saveTargetarr(sourceId, targetId) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js old mode 100644 new mode 100755 diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue old mode 100644 new mode 100755 diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue index b082f883fb..ad33503532 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue @@ -116,4 +116,4 @@ }, components: { mPopup, mListBoxF } } - \ No newline at end of file + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue old mode 100644 new mode 100755 diff --git a/dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js old mode 100644 new mode 100755 diff --git a/dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue b/dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue old mode 100644 new mode 100755 diff --git a/sql/soft_version b/sql/soft_version index 867e52437a..d2d61a7e8e 100644 --- a/sql/soft_version +++ b/sql/soft_version @@ -1 +1 @@ -1.2.0 \ No newline at end of file +1.2.2 \ No newline at end of file diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql index 049484ce3a..f960d5ce49 100644 --- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql @@ -74,4 +74,84 @@ d// delimiter ; CALL uc_dolphin_T_t_ds_task_instance_C_app_link; -DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link; \ No newline at end of file +DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link; + +-- ac_dolphin_T_t_ds_resources_A_pid +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_pid; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_pid() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_resources' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='pid') + THEN + ALTER TABLE t_ds_resources ADD `pid` int(11) DEFAULT -1 COMMENT 'parent id'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_resources_A_pid; +DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_pid; + +-- ac_dolphin_T_t_ds_resources_A_full_name +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_full_name; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_resources' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='full_name') + THEN + ALTER TABLE t_ds_resources ADD `full_name` varchar(255) DEFAULT NULL COMMENT 'full name'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_resources_A_full_name; +DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name; + +-- ac_dolphin_T_t_ds_resources_A_pid +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_is_directory; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_resources_is_directory() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_resources' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='is_directory') + THEN + ALTER TABLE t_ds_resources ADD `is_directory` tinyint(1) DEFAULT 0 COMMENT 'is directory'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_resources_is_directory; +DROP PROCEDURE ac_dolphin_T_t_ds_resources_is_directory; + +-- ac_dolphin_T_t_ds_process_definition_A_resource_ids +drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids; +delimiter d// +CREATE PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_definition' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='resource_ids') + THEN + ALTER TABLE t_ds_process_definition ADD `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids'; + END IF; + END; + +d// + +delimiter ; +CALL ac_dolphin_T_t_ds_process_definition_A_resource_ids; +DROP PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids; \ No newline at end of file diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index b1e0fd941c..9b5f15b8ae 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -66,4 +66,87 @@ d// delimiter ; SELECT uc_dolphin_T_t_ds_task_instance_C_app_link(); -DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link(); \ No newline at end of file +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link(); + + +-- ac_dolphin_T_t_ds_resources_A_pid +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_pid() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_resources' + AND COLUMN_NAME ='pid') + THEN + ALTER TABLE t_ds_resources ADD COLUMN pid int DEFAULT -1; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_resources_A_pid(); +DROP FUNCTION ac_dolphin_T_t_ds_resources_A_pid(); + +-- ac_dolphin_T_t_ds_resources_A_full_name +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_full_name(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_full_name() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_resources' + AND COLUMN_NAME ='full_name') + THEN + ALTER TABLE t_ds_resources ADD COLUMN full_name varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_resources_A_full_name(); +DROP FUNCTION ac_dolphin_T_t_ds_resources_A_full_name(); + +-- ac_dolphin_T_t_ds_resources_A_is_directory +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_is_directory(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_resources' + AND COLUMN_NAME ='is_directory') + THEN + ALTER TABLE t_ds_resources ADD COLUMN is_directory boolean DEFAULT false; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_resources_A_is_directory(); +DROP FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory(); + +-- ac_dolphin_T_t_ds_process_definition_A_resource_ids +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_process_definition' + AND COLUMN_NAME ='resource_ids') + THEN + ALTER TABLE t_ds_process_definition ADD COLUMN resource_ids varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_process_definition_A_resource_ids(); +DROP FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids(); diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql index 38964cc551..f892fa8c91 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql @@ -13,4 +13,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ \ No newline at end of file +*/ +UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL; +UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL; \ No newline at end of file