break60 4 years ago
parent
commit
2aa4ac8884
  1. 152
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  2. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java
  3. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java
  4. 193
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java
  5. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
  6. 100
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
  7. 130
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
  8. 31
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java
  9. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  10. 34
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  11. 559
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  12. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
  13. 58
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
  14. 82
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
  15. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/enums/StatusTest.java
  16. 112
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  17. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  18. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
  19. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
  20. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
  21. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java
  22. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
  23. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
  24. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java
  25. 29
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  26. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
  27. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java
  28. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
  29. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java
  30. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
  31. 16
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
  32. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
  33. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
  34. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
  35. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
  36. 49
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  37. 14
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java
  38. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java
  39. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  40. 68
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
  41. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  42. 53
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
  43. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
  44. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
  45. 79
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
  46. 24
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  47. 57
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
  48. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
  49. 50
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  50. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
  51. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  52. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  53. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  54. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  55. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  56. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
  57. 60
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  58. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java
  59. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
  60. 26
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  61. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
  62. 0
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
  63. 0
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue
  64. 2
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue
  65. 0
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue
  66. 0
      dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js
  67. 0
      dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue
  68. 2
      sql/soft_version
  69. 82
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  70. 85
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
  71. 4
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql

152
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -60,6 +60,50 @@ public class ResourcesController extends BaseController{
@Autowired
private UdfFuncService udfFuncService;
/**
* create resource
*
* @param loginUser login user
* @param alias alias
* @param description description
* @param type type
* @return create result code
*/
/**
*
* @param loginUser login user
* @param type type
* @param alias alias
* @param description description
* @param pid parent id
* @param currentDir current directory
* @return
*/
@ApiOperation(value = "createDirctory", notes= "CREATE_RESOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String"),
@ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType ="String"),
@ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile")
})
@PostMapping(value = "/directory/create")
public Result createDirectory(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value ="name") String alias,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir) {
try {
logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
loginUser.getUserName(),type, alias, description,pid,currentDir);
return resourceService.createDirectory(loginUser,alias, description,type ,pid,currentDir);
} catch (Exception e) {
logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
}
}
/**
* create resource
*
@ -80,13 +124,15 @@ public class ResourcesController extends BaseController{
@PostMapping(value = "/create")
public Result createResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "type") ResourceType type,
@RequestParam(value ="name")String alias,
@RequestParam(value ="name") String alias,
@RequestParam(value = "description", required = false) String description,
@RequestParam("file") MultipartFile file) {
@RequestParam("file") MultipartFile file,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir) {
try {
logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
loginUser.getUserName(),type, alias, description, file.getName(), file.getOriginalFilename());
return resourceService.createResource(loginUser,alias, description,type ,file);
return resourceService.createResource(loginUser,alias, description,type ,file,pid,currentDir);
} catch (Exception e) {
logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
@ -120,7 +166,7 @@ public class ResourcesController extends BaseController{
try {
logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}",
loginUser.getUserName(),type, alias, description);
return resourceService.updateResource(loginUser,resourceId,alias, description,type);
return resourceService.updateResource(loginUser,resourceId,alias,description,type);
} catch (Exception e) {
logger.error(UPDATE_RESOURCE_ERROR.getMsg(),e);
return error(Status.UPDATE_RESOURCE_ERROR.getCode(), Status.UPDATE_RESOURCE_ERROR.getMsg());
@ -166,6 +212,7 @@ public class ResourcesController extends BaseController{
@ApiOperation(value = "queryResourceListPaging", notes= "QUERY_RESOURCE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType ="int"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType ="String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType ="Int",example = "20")
@ -174,6 +221,7 @@ public class ResourcesController extends BaseController{
@ResponseStatus(HttpStatus.OK)
public Result queryResourceListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="type") ResourceType type,
@RequestParam(value ="id") int id,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize
@ -187,7 +235,7 @@ public class ResourcesController extends BaseController{
}
searchVal = ParameterUtils.handleEscapes(searchVal);
result = resourceService.queryResourceListPaging(loginUser,type,searchVal,pageNo, pageSize);
result = resourceService.queryResourceListPaging(loginUser,id,type,searchVal,pageNo, pageSize);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(QUERY_RESOURCES_LIST_PAGING.getMsg(),e);
@ -227,32 +275,89 @@ public class ResourcesController extends BaseController{
* verify resource by alias and type
*
* @param loginUser login user
* @param alias resource name
* @param type resource type
* @param fullName resource full name
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
@ApiOperation(value = "verifyResourceName", notes= "VERIFY_RESOURCE_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String")
@ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
})
@GetMapping(value = "/verify-name")
@ResponseStatus(HttpStatus.OK)
public Result verifyResourceName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="name") String alias,
@RequestParam(value ="fullName") String fullName,
@RequestParam(value ="type") ResourceType type
) {
try {
logger.info("login user {}, verfiy resource alias: {},resource type: {}",
loginUser.getUserName(), alias,type);
loginUser.getUserName(), fullName,type);
return resourceService.verifyResourceName(alias,type,loginUser);
return resourceService.verifyResourceName(fullName,type,loginUser);
} catch (Exception e) {
logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e);
return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg());
}
}
/**
* query resources jar list
*
* @param loginUser login user
* @param type resource type
* @return resource list
*/
@ApiOperation(value = "queryResourceJarList", notes= "QUERY_RESOURCE_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType")
})
@GetMapping(value="/list/jar")
@ResponseStatus(HttpStatus.OK)
public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="type") ResourceType type
){
try{
logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString());
Map<String, Object> result = resourceService.queryResourceJarList(loginUser, type);
return returnDataList(result);
}catch (Exception e){
logger.error(QUERY_RESOURCES_LIST_ERROR.getMsg(),e);
return error(Status.QUERY_RESOURCES_LIST_ERROR.getCode(), Status.QUERY_RESOURCES_LIST_ERROR.getMsg());
}
}
/**
* query resource by full name and type
*
* @param loginUser login user
* @param fullName resource full name
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
@ApiOperation(value = "queryResource", notes= "QUERY_BY_RESOURCE_NAME")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
@ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
})
@GetMapping(value = "/queryResource")
@ResponseStatus(HttpStatus.OK)
public Result queryResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value ="fullName",required = false) String fullName,
@RequestParam(value ="id",required = false) Integer id,
@RequestParam(value ="type") ResourceType type
) {
try {
logger.info("login user {}, query resource by full name: {} or id: {},resource type: {}",
loginUser.getUserName(), fullName,id,type);
return resourceService.queryResource(fullName,id,type);
} catch (Exception e) {
logger.error(RESOURCE_NOT_EXIST.getMsg(), e);
return error(Status.RESOURCE_NOT_EXIST.getCode(), Status.RESOURCE_NOT_EXIST.getMsg());
}
}
/**
* view resource file online
*
@ -310,16 +415,18 @@ public class ResourcesController extends BaseController{
@RequestParam(value ="fileName")String fileName,
@RequestParam(value ="suffix")String fileSuffix,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "content") String content
@RequestParam(value = "content") String content,
@RequestParam(value ="pid") int pid,
@RequestParam(value ="currentDir") String currentDir
) {
try{
logger.info("login user {}, online create resource! fileName : {}, type : {}, suffix : {},desc : {},content : {}",
loginUser.getUserName(),fileName,type,fileSuffix,description,content);
loginUser.getUserName(),fileName,type,fileSuffix,description,content,pid,currentDir);
if(StringUtils.isEmpty(content)){
logger.error("resource file contents are not allowed to be empty");
return error(Status.RESOURCE_FILE_IS_EMPTY.getCode(), RESOURCE_FILE_IS_EMPTY.getMsg());
}
return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content);
return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content,pid,currentDir);
}catch (Exception e){
logger.error(CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg(),e);
return error(Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getCode(), Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg());
@ -384,6 +491,9 @@ public class ResourcesController extends BaseController{
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"")
.body(file);
}catch (RuntimeException e){
logger.error(e.getMessage(),e);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
}catch (Exception e){
logger.error(DOWNLOAD_RESOURCE_FILE_ERROR.getMsg(),e);
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Status.DOWNLOAD_RESOURCE_FILE_ERROR.getMsg());
@ -658,21 +768,21 @@ public class ResourcesController extends BaseController{
* @param userId user id
* @return unauthorized result code
*/
@ApiOperation(value = "unauthorizedFile", notes= "UNAUTHORIZED_FILE_NOTES")
@ApiOperation(value = "authorizeResourceTree", notes= "AUTHORIZE_RESOURCE_TREE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType ="Int", example = "100")
})
@GetMapping(value = "/unauth-file")
@GetMapping(value = "/authorize-resource-tree")
@ResponseStatus(HttpStatus.CREATED)
public Result unauthorizedFile(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result authorizeResourceTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("userId") Integer userId) {
try{
logger.info("resource unauthorized file, user:{}, unauthorized user id:{}", loginUser.getUserName(), userId);
Map<String, Object> result = resourceService.unauthorizedFile(loginUser, userId);
logger.info("all resource file, user:{}, user id:{}", loginUser.getUserName(), userId);
Map<String, Object> result = resourceService.authorizeResourceTree(loginUser, userId);
return returnDataList(result);
}catch (Exception e){
logger.error(UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg(),e);
return error(Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getCode(), Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg());
logger.error(AUTHORIZE_RESOURCE_TREE.getMsg(),e);
return error(Status.AUTHORIZE_RESOURCE_TREE.getCode(), Status.AUTHORIZE_RESOURCE_TREE.getMsg());
}
}

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java

@ -0,0 +1,29 @@
package org.apache.dolphinscheduler.api.dto.resources;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* directory
*/
public class Directory extends ResourceComponent{
@Override
public boolean isDirctory() {
return true;
}
}

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java

@ -0,0 +1,24 @@
package org.apache.dolphinscheduler.api.dto.resources;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* file leaf
*/
public class FileLeaf extends ResourceComponent{
}

193
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java

@ -0,0 +1,193 @@
package org.apache.dolphinscheduler.api.dto.resources;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.annotation.JSONType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import java.util.ArrayList;
import java.util.List;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* resource component
*/
@JSONType(orders={"id","pid","name","fullName","description","isDirctory","children","type"})
public abstract class ResourceComponent {
public ResourceComponent() {
}
public ResourceComponent(int id, int pid, String name, String fullName, String description, boolean isDirctory) {
this.id = id;
this.pid = pid;
this.name = name;
this.fullName = fullName;
this.description = description;
this.isDirctory = isDirctory;
int directoryFlag = isDirctory ? 1:0;
this.idValue = String.format("%s_%s",id,directoryFlag);
}
/**
* id
*/
@JSONField(ordinal = 1)
protected int id;
/**
* parent id
*/
@JSONField(ordinal = 2)
protected int pid;
/**
* name
*/
@JSONField(ordinal = 3)
protected String name;
/**
* current directory
*/
protected String currentDir;
/**
* full name
*/
@JSONField(ordinal = 4)
protected String fullName;
/**
* description
*/
@JSONField(ordinal = 5)
protected String description;
/**
* is directory
*/
@JSONField(ordinal = 6)
protected boolean isDirctory;
/**
* id value
*/
@JSONField(ordinal = 7)
protected String idValue;
/**
* resoruce type
*/
@JSONField(ordinal = 8)
protected ResourceType type;
/**
* children
*/
@JSONField(ordinal = 8)
protected List<ResourceComponent> children = new ArrayList<>();
/**
* add resource component
* @param resourceComponent resource component
*/
public void add(ResourceComponent resourceComponent){
children.add(resourceComponent);
}
public String getName(){
return this.name;
}
public String getDescription(){
return this.description;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public void setName(String name) {
this.name = name;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public void setDescription(String description) {
this.description = description;
}
public boolean isDirctory() {
return isDirctory;
}
public void setDirctory(boolean dirctory) {
isDirctory = dirctory;
}
public String getIdValue() {
return idValue;
}
public void setIdValue(int id,boolean isDirctory) {
int directoryFlag = isDirctory ? 1:0;
this.idValue = String.format("%s_%s",id,directoryFlag);
}
public ResourceType getType() {
return type;
}
public void setType(ResourceType type) {
this.type = type;
}
public List<ResourceComponent> getChildren() {
return children;
}
public void setChildren(List<ResourceComponent> children) {
this.children = children;
}
@Override
public String toString() {
return "ResourceComponent{" +
"id=" + id +
", pid=" + pid +
", name='" + name + '\'' +
", currentDir='" + currentDir + '\'' +
", fullName='" + fullName + '\'' +
", description='" + description + '\'' +
", isDirctory=" + isDirctory +
", idValue='" + idValue + '\'' +
", type=" + type +
", children=" + children +
'}';
}
}

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.List;
/**
* interface filter
*/
public interface IFilter {
List<Resource> filter();
}

100
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* resource filter
*/
public class ResourceFilter implements IFilter {
/**
* resource suffix
*/
private String suffix;
/**
* resource list
*/
private List<Resource> resourceList;
/**
* parent list
*/
//Set<Resource> parentList = new HashSet<>();
/**
* constructor
* @param suffix resource suffix
* @param resourceList resource list
*/
public ResourceFilter(String suffix, List<Resource> resourceList) {
this.suffix = suffix;
this.resourceList = resourceList;
}
/**
* file filter
* @return file filtered by suffix
*/
public Set<Resource> fileFilter(){
Set<Resource> resources = resourceList.stream().filter(t -> {
String alias = t.getAlias();
return alias.endsWith(suffix);
}).collect(Collectors.toSet());
return resources;
}
/**
* list all parent dir
* @return parent resource dir set
*/
Set<Resource> listAllParent(){
Set<Resource> parentList = new HashSet<>();
Set<Resource> filterFileList = fileFilter();
for(Resource file:filterFileList){
parentList.add(file);
setAllParent(file,parentList);
}
return parentList;
}
/**
* list all parent dir
* @param resource resource
* @return parent resource dir set
*/
private void setAllParent(Resource resource,Set<Resource> parentList){
for (Resource resourceTemp : resourceList) {
if (resourceTemp.getId() == resource.getPid()) {
parentList.add(resourceTemp);
setAllParent(resourceTemp,parentList);
}
}
}
@Override
public List<Resource> filter() {
return new ArrayList<>(listAllParent());
}
}

130
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.Directory;
import org.apache.dolphinscheduler.api.dto.resources.FileLeaf;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* resource tree visitor
*/
public class ResourceTreeVisitor implements Visitor{
/**
* resource list
*/
private List<Resource> resourceList;
public ResourceTreeVisitor() {
}
/**
* constructor
* @param resourceList resource list
*/
public ResourceTreeVisitor(List<Resource> resourceList) {
this.resourceList = resourceList;
}
/**
* visit
* @return resoruce component
*/
public ResourceComponent visit() {
ResourceComponent rootDirectory = new Directory();
for (Resource resource : resourceList) {
// judge whether is root node
if (rootNode(resource)){
ResourceComponent tempResourceComponent = getResourceComponent(resource);
rootDirectory.add(tempResourceComponent);
tempResourceComponent.setChildren(setChildren(tempResourceComponent.getId(),resourceList));
}
}
return rootDirectory;
}
/**
* set children
* @param id id
* @param list resource list
* @return resource component list
*/
public static List<ResourceComponent> setChildren(int id, List<Resource> list ){
List<ResourceComponent> childList = new ArrayList<>();
for (Resource resource : list) {
if (id == resource.getPid()){
ResourceComponent tempResourceComponent = getResourceComponent(resource);
childList.add(tempResourceComponent);
}
}
for (ResourceComponent resourceComponent : childList) {
resourceComponent.setChildren(setChildren(resourceComponent.getId(),list));
}
if (childList.size()==0){
return new ArrayList<>();
}
return childList;
}
/**
* Determine whether it is the root node
* @param resource resource
* @return true if it is the root node
*/
public boolean rootNode(Resource resource) {
boolean isRootNode = true;
if(resource.getPid() != -1 ){
for (Resource parent : resourceList) {
if (resource.getPid() == parent.getId()) {
isRootNode = false;
break;
}
}
}
return isRootNode;
}
/**
* get resource component by resource
* @param resource resource
* @return resource component
*/
private static ResourceComponent getResourceComponent(Resource resource) {
ResourceComponent tempResourceComponent;
if(resource.isDirectory()){
tempResourceComponent = new Directory();
}else{
tempResourceComponent = new FileLeaf();
}
tempResourceComponent.setName(resource.getAlias());
tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
tempResourceComponent.setId(resource.getId());
tempResourceComponent.setPid(resource.getPid());
tempResourceComponent.setIdValue(resource.getId(),resource.isDirectory());
tempResourceComponent.setDescription(resource.getDescription());
tempResourceComponent.setType(resource.getType());
return tempResourceComponent;
}
}

31
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java

@ -0,0 +1,31 @@
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Visitor
*/
public interface Visitor {
/**
* visit
* @return resource component
*/
ResourceComponent visit();
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -97,7 +97,7 @@ public enum Status {
VERIFY_UDF_FUNCTION_NAME_ERROR( 10070,"verify udf function name error", "UDF函数名称验证错误"),
DELETE_UDF_FUNCTION_ERROR( 10071,"delete udf function error", "删除UDF函数错误"),
AUTHORIZED_FILE_RESOURCE_ERROR( 10072,"authorized file resource error", "授权资源文件错误"),
UNAUTHORIZED_FILE_RESOURCE_ERROR( 10073,"unauthorized file resource error", "查询未授权资源错误"),
AUTHORIZE_RESOURCE_TREE( 10073,"authorize resource tree display error","授权资源目录树错误"),
UNAUTHORIZED_UDF_FUNCTION_ERROR( 10074,"unauthorized udf function error", "查询未授权UDF函数错误"),
AUTHORIZED_UDF_FUNCTION_ERROR(10075,"authorized udf function error", "授权UDF函数错误"),
CREATE_SCHEDULE_ERROR(10076,"create schedule error", "创建调度配置错误"),
@ -184,10 +184,12 @@ public enum Status {
RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit", "上传资源文件大小超过限制"),
RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified", "资源文件后缀不支持修改"),
UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar", "UDF资源文件后缀名只支持[jar]"),
HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"),
USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),

34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -38,11 +38,9 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
@ -162,6 +160,31 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
/**
* get resource ids
* @param processData process data
* @return resource ids
*/
private String getResourceIds(ProcessData processData) {
List<TaskNode> tasks = processData.getTasks();
Set<Integer> resourceIds = new HashSet<>();
for(TaskNode taskNode : tasks){
String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter);
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
StringBuilder sb = new StringBuilder();
for(int i : resourceIds) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(i);
}
return sb.toString();
}
/**
* query proccess definition list
@ -946,7 +969,9 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check
@ -1163,6 +1188,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//check process data

559
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -16,9 +16,15 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.collections.BeanMap;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@ -39,6 +45,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.text.MessageFormat;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.*;
@ -65,6 +72,82 @@ public class ResourcesService extends BaseService {
@Autowired
private ResourceUserMapper resourceUserMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/**
* create directory
*
* @param loginUser login user
* @param name alias
* @param description description
* @param type type
* @param pid parent id
* @param currentDir current directory
* @return create directory result
*/
@Transactional(rollbackFor = Exception.class)
public Result createDirectory(User loginUser,
String name,
String description,
ResourceType type,
int pid,
String currentDir) {
Result result = new Result();
// if hdfs not startup
if (!PropertyUtils.getResUploadStartupState()){
logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
if (parentResource == null) {
putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, parentResource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
}
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource directory {} has exist, can't recreate", fullName);
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
Date now = new Date();
Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now);
try {
resourcesMapper.insert(resource);
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<String, Object>();
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
} catch (Exception e) {
logger.error("resource already exists, can't recreate ", e);
throw new RuntimeException("resource already exists, can't recreate");
}
//create directory in hdfs
createDirecotry(loginUser,fullName,type,result);
return result;
}
/**
* create resource
*
@ -73,6 +156,8 @@ public class ResourcesService extends BaseService {
* @param desc description
* @param file file
* @param type type
* @param pid parent id
* @param currentDir current directory
* @return create result code
*/
@Transactional(rollbackFor = Exception.class)
@ -80,7 +165,9 @@ public class ResourcesService extends BaseService {
String name,
String desc,
ResourceType type,
MultipartFile file) {
MultipartFile file,
int pid,
String currentDir) {
Result result = new Result();
// if hdfs not startup
@ -123,7 +210,8 @@ public class ResourcesService extends BaseService {
}
// check resoure name exists
if (checkResourceExists(name, 0, type.ordinal())) {
String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} has exist, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
return result;
@ -131,7 +219,9 @@ public class ResourcesService extends BaseService {
Date now = new Date();
Resource resource = new Resource(name,file.getOriginalFilename(),desc,loginUser.getId(),type,file.getSize(),now,now);
Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now);
try {
resourcesMapper.insert(resource);
@ -151,7 +241,7 @@ public class ResourcesService extends BaseService {
}
// fail upload
if (!upload(loginUser, name, file, type)) {
if (!upload(loginUser, fullName, file, type)) {
logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
putMsg(result, Status.HDFS_OPERATION_ERROR);
throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
@ -162,27 +252,29 @@ public class ResourcesService extends BaseService {
/**
* check resource is exists
*
* @param alias alias
* @param fullName fullName
* @param userId user id
* @param type type
* @return true if resource exists
*/
private boolean checkResourceExists(String alias, int userId, int type ){
List<Resource> resources = resourcesMapper.queryResourceList(alias, userId, type);
return CollectionUtils.isNotEmpty(resources);
}
private boolean checkResourceExists(String fullName, int userId, int type ){
List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
if (resources != null && resources.size() > 0) {
return true;
}
return false;
}
/**
* update resource
*
* @param loginUser login user
* @param name alias
* @param resourceId resource id
* @param type resource type
* @param desc description
* @return update result code
* @param loginUser login user
* @param resourceId resource id
* @param name name
* @param desc description
* @param type resource type
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
public Result updateResource(User loginUser,
@ -216,7 +308,10 @@ public class ResourcesService extends BaseService {
}
//check resource aleady exists
if (!resource.getAlias().equals(name) && checkResourceExists(name, 0, type.ordinal())) {
String originFullName = resource.getFullName();
String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
if (!resource.getAlias().equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource {} already exists, can't recreate", name);
putMsg(result, Status.RESOURCE_EXIST);
return result;
@ -227,25 +322,41 @@ public class ResourcesService extends BaseService {
if (StringUtils.isEmpty(tenantCode)){
return result;
}
//get the file suffix
String nameWithSuffix = name;
String originResourceName = resource.getAlias();
String suffix = originResourceName.substring(originResourceName.lastIndexOf('.'));
if (!resource.isDirectory()) {
//get the file suffix
//if the name without suffix then add it ,else use the origin name
String nameWithSuffix = name;
if(!name.endsWith(suffix)){
nameWithSuffix = nameWithSuffix + suffix;
String suffix = originResourceName.substring(originResourceName.lastIndexOf("."));
//if the name without suffix then add it ,else use the origin name
if(!name.endsWith(suffix)){
nameWithSuffix = nameWithSuffix + suffix;
}
}
// updateResource data
List<Integer> childrenResource = listAllChildren(resource);
String oldFullName = resource.getFullName();
Date now = new Date();
resource.setAlias(nameWithSuffix);
resource.setFullName(fullName);
resource.setDescription(desc);
resource.setUpdateTime(now);
try {
resourcesMapper.updateById(resource);
if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) {
List<Resource> childResourceList = new ArrayList<>();
List<Resource> resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()]));
childResourceList = resourceList.stream().map(t -> {
t.setFullName(t.getFullName().replaceFirst(oldFullName, fullName));
t.setUpdateTime(now);
return t;
}).collect(Collectors.toList());
resourcesMapper.batchUpdateResource(childResourceList);
}
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
@ -267,15 +378,9 @@ public class ResourcesService extends BaseService {
// get file hdfs path
// delete hdfs file by type
String originHdfsFileName = "";
String destHdfsFileName = "";
if (resource.getType().equals(ResourceType.FILE)) {
originHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, originResourceName);
destHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, name);
} else if (resource.getType().equals(ResourceType.UDF)) {
originHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, originResourceName);
destHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
}
String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName);
String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
try {
if (HadoopUtils.getInstance().exists(originHdfsFileName)) {
logger.info("hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
@ -303,7 +408,7 @@ public class ResourcesService extends BaseService {
* @param pageSize page size
* @return resource list page
*/
public Map<String, Object> queryResourceListPaging(User loginUser, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
HashMap<String, Object> result = new HashMap<>(5);
Page<Resource> page = new Page(pageNo, pageSize);
@ -312,7 +417,7 @@ public class ResourcesService extends BaseService {
userId= 0;
}
IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
userId, type.ordinal(), searchVal);
userId,direcotryId, type.ordinal(), searchVal);
PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
pageInfo.setTotalCount((int)resourceIPage.getTotal());
pageInfo.setLists(resourceIPage.getRecords());
@ -321,17 +426,46 @@ public class ResourcesService extends BaseService {
return result;
}
/**
* create direcoty
* @param loginUser login user
* @param fullName full name
* @param type resource type
* @param result Result
*/
private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
// query tenant
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
try {
if (!HadoopUtils.getInstance().exists(resourceRootPath)) {
createTenantDirIfNotExists(tenantCode);
}
if (!HadoopUtils.getInstance().mkdir(directoryName)) {
logger.error("create resource directory {} of hdfs failed",directoryName);
putMsg(result,Status.HDFS_OPERATION_ERROR);
throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
}
} catch (Exception e) {
logger.error("create resource directory {} of hdfs failed",directoryName);
putMsg(result,Status.HDFS_OPERATION_ERROR);
throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
}
}
/**
* upload file to hdfs
*
* @param loginUser
* @param name
* @param file
* @param loginUser login user
* @param fullName full name
* @param file file
*/
private boolean upload(User loginUser, String name, MultipartFile file, ResourceType type) {
private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) {
// save to local
String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
String nameSuffix = FileUtils.suffix(name);
String nameSuffix = FileUtils.suffix(fullName);
// determine file suffix
if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
@ -344,15 +478,8 @@ public class ResourcesService extends BaseService {
// save file to hdfs, and delete original file
String hdfsFilename = "";
String resourcePath = "";
if (type.equals(ResourceType.FILE)) {
hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
} else if (type.equals(ResourceType.UDF)) {
hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode);
}
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
try {
// if tenant dir not exists
if (!HadoopUtils.getInstance().exists(resourcePath)) {
@ -377,13 +504,59 @@ public class ResourcesService extends BaseService {
public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
Map<String, Object> result = new HashMap<>(5);
List<Resource> resourceList;
Set<Resource> allResourceList = getAllResources(loginUser, type);
Visitor resourceTreeVisitor = new ResourceTreeVisitor(new ArrayList<>(allResourceList));
//JSONArray jsonArray = JSON.parseArray(JSON.toJSONString(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
return result;
}
/**
* get all resources
* @param loginUser login user
* @return all resource set
*/
private Set<Resource> getAllResources(User loginUser, ResourceType type) {
int userId = loginUser.getId();
boolean listChildren = true;
if(isAdmin(loginUser)){
userId = 0;
listChildren = false;
}
List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal());
Set<Resource> allResourceList = new HashSet<>(resourceList);
if (listChildren) {
Set<Integer> authorizedIds = new HashSet<>();
List<Resource> authorizedDirecoty = resourceList.stream().filter(t->t.getUserId() != loginUser.getId() && t.isDirectory()).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(authorizedDirecoty)) {
for(Resource resource : authorizedDirecoty){
authorizedIds.addAll(listAllChildren(resource));
}
List<Resource> childrenResources = resourcesMapper.listResourceByIds(authorizedIds.toArray(new Integer[authorizedIds.size()]));
allResourceList.addAll(childrenResources);
}
}
resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal());
result.put(Constants.DATA_LIST, resourceList);
return allResourceList;
}
/**
* query resource list
*
* @param loginUser login user
* @param type resource type
* @return resource list
*/
public Map<String, Object> queryResourceJarList(User loginUser, ResourceType type) {
Map<String, Object> result = new HashMap<>(5);
Set<Resource> allResourceList = getAllResources(loginUser, type);
List<Resource> resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
return result;
@ -419,23 +592,51 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
//if resource type is UDF,need check whether it is bound by UDF functon
if (resource.getType() == (ResourceType.UDF)) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId});
if (CollectionUtils.isNotEmpty(udfFuncs)) {
logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
return result;
}
}
Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
if (tenant == null){
putMsg(result, Status.TENANT_NOT_EXIST);
String tenantCode = getTenantCode(resource.getUserId(),result);
if (StringUtils.isEmpty(tenantCode)){
return result;
}
// get all resource id of process definitions those is released
Map<Integer, Set<Integer>> resourceProcessMap = getResourceProcessMap();
Set<Integer> resourceIdSet = resourceProcessMap.keySet();
// get all children of the resource
List<Integer> allChildren = listAllChildren(resource);
if (resourceIdSet.contains(resource.getPid())) {
logger.error("can't be deleted,because it is used of process definition");
putMsg(result, Status.RESOURCE_IS_USED);
return result;
}
resourceIdSet.retainAll(allChildren);
if (CollectionUtils.isNotEmpty(resourceIdSet)) {
logger.error("can't be deleted,because it is used of process definition");
for (Integer resId : resourceIdSet) {
logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId));
}
putMsg(result, Status.RESOURCE_IS_USED);
return result;
}
String hdfsFilename = "";
// delete hdfs file by type
String tenantCode = tenant.getTenantCode();
hdfsFilename = getHdfsFileName(resource, tenantCode, hdfsFilename);
// get hdfs file by type
String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
//delete data in database
resourcesMapper.deleteById(resourceId);
resourcesMapper.deleteIds(allChildren.toArray(new Integer[allChildren.size()]));
resourceUserMapper.deleteResourceUser(0, resourceId);
//delete file on hdfs
HadoopUtils.getInstance().delete(hdfsFilename, false);
HadoopUtils.getInstance().delete(hdfsFilename, true);
putMsg(result, Status.SUCCESS);
return result;
@ -444,15 +645,15 @@ public class ResourcesService extends BaseService {
/**
* verify resource by name and type
* @param loginUser login user
* @param name resource alias
* @param type resource type
* @param fullName resource full name
* @param type resource type
* @return true if the resource name not exists, otherwise return false
*/
public Result verifyResourceName(String name, ResourceType type,User loginUser) {
public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
Result result = new Result();
putMsg(result, Status.SUCCESS);
if (checkResourceExists(name, 0, type.ordinal())) {
logger.error("resource type:{} name:{} has exist, can't create again.", type, name);
if (checkResourceExists(fullName, 0, type.ordinal())) {
logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
putMsg(result, Status.RESOURCE_EXIST);
} else {
// query tenant
@ -461,9 +662,9 @@ public class ResourcesService extends BaseService {
String tenantCode = tenant.getTenantCode();
try {
String hdfsFilename = getHdfsFileName(type,tenantCode,name);
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
if(HadoopUtils.getInstance().exists(hdfsFilename)){
logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename);
logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
}
@ -480,6 +681,48 @@ public class ResourcesService extends BaseService {
return result;
}
/**
* verify resource by full name or pid and type
* @param fullName resource full name
* @param id resource id
* @param type resource type
* @return true if the resource full name or pid not exists, otherwise return false
*/
public Result queryResource(String fullName,Integer id,ResourceType type) {
Result result = new Result();
if (StringUtils.isBlank(fullName) && id == null) {
logger.error("You must input one of fullName and pid");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
return result;
}
if (StringUtils.isNotBlank(fullName)) {
List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
if (CollectionUtils.isEmpty(resourceList)) {
logger.error("resource file not exist, resource full name {} ", fullName);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
putMsg(result, Status.SUCCESS);
result.setData(resourceList.get(0));
} else {
Resource resource = resourcesMapper.selectById(id);
if (resource == null) {
logger.error("resource file not exist, resource id {}", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
Resource parentResource = resourcesMapper.selectById(resource.getPid());
if (parentResource == null) {
logger.error("parent resource file not exist, resource id {}", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
putMsg(result, Status.SUCCESS);
result.setData(parentResource);
}
return result;
}
/**
* view resource file online
*
@ -501,7 +744,7 @@ public class ResourcesService extends BaseService {
// get resource by id
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
logger.error("resouce file not exist, resource id {}", resourceId);
logger.error("resource file not exist, resource id {}", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@ -511,7 +754,7 @@ public class ResourcesService extends BaseService {
if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
if (!strList.contains(nameSuffix)) {
logger.error("resouce suffix {} not support view, resource id {}", nameSuffix, resourceId);
logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId);
putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
return result;
}
@ -523,7 +766,7 @@ public class ResourcesService extends BaseService {
}
// hdfs path
String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
logger.info("resource hdfs path is {} ", hdfsFileName);
try {
if(HadoopUtils.getInstance().exists(hdfsFileName)){
@ -559,7 +802,7 @@ public class ResourcesService extends BaseService {
* @return create result code
*/
@Transactional(rollbackFor = Exception.class)
public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content) {
public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
Result result = new Result();
// if resource upload startup
if (!PropertyUtils.getResUploadStartupState()){
@ -581,15 +824,16 @@ public class ResourcesService extends BaseService {
}
String name = fileName.trim() + "." + nameSuffix;
String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
result = verifyResourceName(name,type,loginUser);
result = verifyResourceName(fullName,type,loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
// save data
Date now = new Date();
Resource resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now);
Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
resourcesMapper.insert(resource);
@ -605,7 +849,7 @@ public class ResourcesService extends BaseService {
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
result = uploadContentToHdfs(name, tenantCode, content);
result = uploadContentToHdfs(fullName, tenantCode, content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
throw new RuntimeException(result.getMsg());
}
@ -657,7 +901,7 @@ public class ResourcesService extends BaseService {
resourcesMapper.updateById(resource);
result = uploadContentToHdfs(resource.getAlias(), tenantCode, content);
result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
throw new RuntimeException(result.getMsg());
}
@ -665,10 +909,10 @@ public class ResourcesService extends BaseService {
}
/**
* @param resourceName
* @param tenantCode
* @param content
* @return
* @param resourceName resource name
* @param tenantCode tenant code
* @param content content
* @return result
*/
private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
Result result = new Result();
@ -684,8 +928,8 @@ public class ResourcesService extends BaseService {
return result;
}
// get file hdfs path
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName);
// get resource file hdfs path
hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
logger.info("resource hdfs path is {} ", hdfsFileName);
@ -729,11 +973,14 @@ public class ResourcesService extends BaseService {
logger.error("download file not exist, resource id {}", resourceId);
return null;
}
if (resource.isDirectory()) {
logger.error("resource id {} is directory,can't download it", resourceId);
throw new RuntimeException("cant't download directory");
}
User user = userMapper.queryDetailsById(resource.getUserId());
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
String hdfsFileName = "";
hdfsFileName = getHdfsFileName(resource, tenantCode, hdfsFileName);
String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias());
String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
logger.info("resource hdfs path is {} ", hdfsFileName);
@ -743,6 +990,33 @@ public class ResourcesService extends BaseService {
}
/**
* list all file
*
* @param loginUser login user
* @param userId user id
* @return unauthorized result code
*/
public Map<String, Object> authorizeResourceTree(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
if (checkAdmin(loginUser, result)) {
return result;
}
List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
List<ResourceComponent> list ;
if (CollectionUtils.isNotEmpty(resourceList)) {
Visitor visitor = new ResourceTreeVisitor(resourceList);
list = visitor.visit().getChildren();
}else {
list = new ArrayList<>(0);
}
result.put(Constants.DATA_LIST, list);
putMsg(result,Status.SUCCESS);
return result;
}
/**
* unauthorized file
*
@ -757,8 +1031,8 @@ public class ResourcesService extends BaseService {
return result;
}
List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
List<Object> list ;
if (CollectionUtils.isNotEmpty(resourceList)) {
List<Resource> list ;
if (resourceList != null && resourceList.size() > 0) {
Set<Resource> resourceSet = new HashSet<>(resourceList);
List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
@ -767,15 +1041,12 @@ public class ResourcesService extends BaseService {
}else {
list = new ArrayList<>(0);
}
result.put(Constants.DATA_LIST, list);
Visitor visitor = new ResourceTreeVisitor(list);
result.put(Constants.DATA_LIST, visitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
return result;
}
/**
* unauthorized udf function
*
@ -841,46 +1112,15 @@ public class ResourcesService extends BaseService {
return result;
}
List<Resource> authedResources = resourcesMapper.queryAuthorizedResourceList(userId);
result.put(Constants.DATA_LIST, authedResources);
Visitor visitor = new ResourceTreeVisitor(authedResources);
logger.info(JSON.toJSONString(visitor.visit(), SerializerFeature.SortField));
String jsonTreeStr = JSON.toJSONString(visitor.visit().getChildren(), SerializerFeature.SortField);
logger.info(jsonTreeStr);
result.put(Constants.DATA_LIST, visitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
return result;
}
/**
* get hdfs file name
*
* @param resource resource
* @param tenantCode tenant code
* @param hdfsFileName hdfs file name
* @return hdfs file name
*/
private String getHdfsFileName(Resource resource, String tenantCode, String hdfsFileName) {
if (resource.getType().equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
} else if (resource.getType().equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, resource.getAlias());
}
return hdfsFileName;
}
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param hdfsFileName hdfs file name
* @return hdfs file name
*/
private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) {
if (resourceType.equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName);
}
return hdfsFileName;
}
/**
* get authorized resource list
*
@ -920,4 +1160,69 @@ public class ResourcesService extends BaseService {
return tenant.getTenantCode();
}
/**
* list all children id
* @param resource resource
* @return all children id
*/
List<Integer> listAllChildren(Resource resource){
List<Integer> childList = new ArrayList<>();
if (resource.getId() != -1) {
childList.add(resource.getId());
}
if(resource.isDirectory()){
listAllChildren(resource.getId(),childList);
}
return childList;
}
/**
* list all children id
* @param resourceId resource id
* @param childList child list
*/
void listAllChildren(int resourceId,List<Integer> childList){
List<Integer> children = resourcesMapper.listChildren(resourceId);
for(int chlidId:children){
childList.add(chlidId);
listAllChildren(chlidId,childList);
}
}
/**
* get resource process map key is resource id,value is the set of process definition
* @return resource process definition map
*/
private Map<Integer,Set<Integer>> getResourceProcessMap(){
Map<Integer, String> map = new HashMap<>();
Map<Integer, Set<Integer>> result = new HashMap<>();
List<Map<String, Object>> list = processDefinitionMapper.listResources();
if (CollectionUtils.isNotEmpty(list)) {
for (Map<String, Object> tempMap : list) {
map.put((Integer) tempMap.get("id"), (String)tempMap.get("resource_ids"));
}
}
for (Map.Entry<Integer, String> entry : map.entrySet()) {
Integer mapKey = entry.getKey();
String[] arr = entry.getValue().split(",");
Set<Integer> mapValues = Arrays.stream(arr).map(Integer::parseInt).collect(Collectors.toSet());
for (Integer value : mapValues) {
if (result.containsKey(value)) {
Set<Integer> set = result.get(value);
set.add(mapKey);
result.put(value, set);
} else {
Set<Integer> set = new HashSet<>();
set.add(mapKey);
result.put(value, set);
}
}
}
return result;
}
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java

@ -118,7 +118,7 @@ public class UdfFuncService extends BaseService{
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getAlias());
udf.setResourceName(resource.getFullName());
udf.setType(type);
udf.setCreateTime(now);
@ -226,7 +226,7 @@ public class UdfFuncService extends BaseService{
}
udf.setDescription(desc);
udf.setResourceId(resourceId);
udf.setResourceName(resource.getAlias());
udf.setResourceName(resource.getFullName());
udf.setType(type);
udf.setUpdateTime(now);

58
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* resource filter test
*/
public class ResourceFilterTest {
private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class);
@Test
public void filterTest(){
List<Resource> allList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
allList.add(resource1);
allList.add(resource2);
allList.add(resource3);
allList.add(resource4);
allList.add(resource5);
allList.add(resource6);
allList.add(resource7);
ResourceFilter resourceFilter = new ResourceFilter(".jar",allList);
List<Resource> resourceList = resourceFilter.filter();
Assert.assertNotNull(resourceList);
resourceList.stream().forEach(t-> logger.info(t.toString()));
}
}

82
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.resources.visitor;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* resource tree visitor test
*/
public class ResourceTreeVisitorTest {
@Test
public void visit() throws Exception {
List<Resource> resourceList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
ResourceComponent resourceComponent = resourceTreeVisitor.visit();
Assert.assertNotNull(resourceComponent.getChildren());
}
@Test
public void rootNode() throws Exception {
List<Resource> resourceList = new ArrayList<>();
Resource resource1 = new Resource(3,-1,"b","/b",true);
Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
Resource resource5 = new Resource(7,-1,"b2","/b2",true);
Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
resourceList.add(resource1);
resourceList.add(resource2);
resourceList.add(resource3);
resourceList.add(resource4);
resourceList.add(resource5);
resourceList.add(resource6);
resourceList.add(resource7);
ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
Assert.assertTrue(resourceTreeVisitor.rootNode(resource1));
Assert.assertTrue(resourceTreeVisitor.rootNode(resource2));
Assert.assertFalse(resourceTreeVisitor.rootNode(resource3));
}
}

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/enums/StatusTest.java

@ -28,7 +28,7 @@ public class StatusTest {
@Test
public void testGetCode() {
assertEquals(Status.SUCCESS.getCode(), 0);
assertEquals(0, Status.SUCCESS.getCode());
assertNotEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), 0);
}

112
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -24,10 +24,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@ -40,6 +37,7 @@ import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.omg.CORBA.Any;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@ -73,6 +71,8 @@ public class ResourcesServiceTest {
private UserMapper userMapper;
@Mock
private UdfFuncMapper udfFunctionMapper;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Before
public void setUp() {
@ -96,14 +96,14 @@ public class ResourcesServiceTest {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_FILE_IS_EMPTY
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf",new String().getBytes());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(),result.getMsg());
@ -111,31 +111,42 @@ public class ResourcesServiceTest {
mockMultipartFile = new MockMultipartFile("test.pdf","test.pdf","pdf",new String("test").getBytes());
PowerMockito.when(FileUtils.suffix("test.pdf")).thenReturn("pdf");
PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.jar")).thenReturn("jar");
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_SUFFIX_FORBID_CHANGE.getMsg(),result.getMsg());
//UDF_RESOURCE_SUFFIX_NOT_JAR
mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.pdf","ResourcesServiceTest.pdf","pdf",new String("test").getBytes());
PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.pdf")).thenReturn("pdf");
result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg(),result.getMsg());
//UDF_RESOURCE_SUFFIX_NOT_JAR
Mockito.when(tenantMapper.queryById(0)).thenReturn(getTenant());
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(getResourceList());
mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.jar","ResourcesServiceTest.jar","pdf",new String("test").getBytes());
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
}
@Test
public void testCreateDirecotry(){
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
Result result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//SUCCESS
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(new ArrayList<>());
result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
//PARENT_RESOURCE_NOT_EXIST
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
Assert.assertEquals(Status.PARENT_RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//RESOURCE_EXIST
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(resourcesMapper.queryResourceList("/directoryTest", 0, 0)).thenReturn(getResourceList());
result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
}
@ -163,41 +174,46 @@ public class ResourcesServiceTest {
//SUCCESS
user.setId(1);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest.jar",ResourceType.FILE);
Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//RESOURCE_EXIST
Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.FILE);
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
//USER_NOT_EXIST
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
//TENANT_NOT_EXIST
Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
//RESOURCE_NOT_EXIST
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test1");
PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test1");
try {
Mockito.when(hadoopUtils.exists("test")).thenReturn(true);
} catch (IOException e) {
e.printStackTrace();
}
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//SUCCESS
PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test");
PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test");
result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
@ -212,8 +228,8 @@ public class ResourcesServiceTest {
resourcePage.setTotal(1);
resourcePage.setRecords(getResourceList());
Mockito.when(resourcesMapper.queryResourcePaging(Mockito.any(Page.class),
Mockito.eq(0), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,ResourceType.FILE,"test",1,10);
Mockito.eq(0),Mockito.eq(-1), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,-1,ResourceType.FILE,"test",1,10);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
@ -263,6 +279,7 @@ public class ResourcesServiceTest {
//TENANT_NOT_EXIST
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2);
Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(loginUser);
result = resourcesService.delete(loginUser,1);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
@ -285,14 +302,20 @@ public class ResourcesServiceTest {
User user = new User();
user.setId(1);
Mockito.when(resourcesMapper.queryResourceList("test", 0, 0)).thenReturn(getResourceList());
Result result = resourcesService.verifyResourceName("test",ResourceType.FILE,user);
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest.jar", 0, 0)).thenReturn(getResourceList());
Result result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg());
//TENANT_NOT_EXIST
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
String unExistFullName = "/test.jar";
try {
Mockito.when(hadoopUtils.exists(unExistFullName)).thenReturn(false);
} catch (IOException e) {
logger.error("hadoop error",e);
}
result = resourcesService.verifyResourceName("/test.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
@ -304,10 +327,10 @@ public class ResourcesServiceTest {
} catch (IOException e) {
logger.error("hadoop error",e);
}
PowerMockito.when(HadoopUtils.getHdfsFilename("123", "test1")).thenReturn("test");
result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
PowerMockito.when(HadoopUtils.getHdfsResourceFileName("123", "test1")).thenReturn("test");
result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
logger.info(result.toString());
Assert.assertTrue(Status.RESOURCE_FILE_EXIST.getCode()==result.getCode());
Assert.assertTrue(Status.RESOURCE_EXIST.getCode()==result.getCode());
//SUCCESS
result = resourcesService.verifyResourceName("test2",ResourceType.FILE,user);
@ -389,14 +412,14 @@ public class ResourcesServiceTest {
PowerMockito.when(HadoopUtils.getHdfsUdfDir("udfDir")).thenReturn("udfDir");
User user = getUser();
//HDFS_NOT_STARTUP
Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("class");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(),result.getMsg());
@ -404,7 +427,7 @@ public class ResourcesServiceTest {
try {
PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("jar");
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content");
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content",-1,"/");
}catch (RuntimeException ex){
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), ex.getMessage());
@ -413,7 +436,7 @@ public class ResourcesServiceTest {
//SUCCESS
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
@ -584,13 +607,26 @@ public class ResourcesServiceTest {
private Resource getResource(){
Resource resource = new Resource();
resource.setPid(-1);
resource.setUserId(1);
resource.setDescription("ResourcesServiceTest.jar");
resource.setAlias("ResourcesServiceTest.jar");
resource.setFullName("/ResourcesServiceTest.jar");
resource.setType(ResourceType.FILE);
return resource;
}
private Resource getUdfResource(){
Resource resource = new Resource();
resource.setUserId(1);
resource.setDescription("udfTest");
resource.setAlias("udfTest.jar");
resource.setFullName("/udfTest.jar");
resource.setType(ResourceType.UDF);
return resource;
}
private UdfFunc getUdfFunc(){
UdfFunc udfFunc = new UdfFunc();

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -43,6 +43,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import static org.junit.Assert.*;
@ -173,7 +174,11 @@ public class CheckUtilsTest {
// MapreduceParameters
MapreduceParameters mapreduceParameters = new MapreduceParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
mapreduceParameters.setMainJar(new ResourceInfo());
ResourceInfo resourceInfoMapreduce = new ResourceInfo();
resourceInfoMapreduce.setId(1);
resourceInfoMapreduce.setRes("");
mapreduceParameters.setMainJar(resourceInfoMapreduce);
mapreduceParameters.setProgramType(ProgramType.JAVA);
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java

@ -23,13 +23,17 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
*/
public enum AuthorizationType {
/**
* 0 RESOURCE_FILE;
* 0 RESOURCE_FILE_ID;
* 0 RESOURCE_FILE_NAME;
* 1 UDF_FILE;
* 1 DATASOURCE;
* 2 UDF;
*/
RESOURCE_FILE(0, "resource file"),
DATASOURCE(1, "data source"),
UDF(2, "udf function");
RESOURCE_FILE_ID(0, "resource file id"),
RESOURCE_FILE_NAME(1, "resource file name"),
UDF_FILE(2, "udf file"),
DATASOURCE(3, "data source"),
UDF(4, "udf function");
AuthorizationType(int code, String descp){
this.code = code;

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java

@ -23,6 +23,16 @@ public class ResourceInfo {
/**
* res the name of the resource that was uploaded
*/
private int id;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
private String res;
public String getRes() {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import java.util.LinkedHashMap;
import java.util.List;
@ -31,7 +32,7 @@ public abstract class AbstractParameters implements IParameters {
public abstract boolean checkParameters();
@Override
public abstract List<String> getResourceFilesList();
public abstract List<ResourceInfo> getResourceFilesList();
/**
* local parameters

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import java.util.List;
/**
@ -34,5 +36,5 @@ public interface IParameters {
*
* @return resource files list
*/
List<String> getResourceFilesList();
List<ResourceInfo> getResourceFilesList();
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.conditions;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
@ -41,7 +42,7 @@ public class ConditionsParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return null;
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
/**
@ -198,7 +199,7 @@ public class DataxParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.dependent;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.ArrayList;
@ -36,7 +37,7 @@ public class DependentParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* spark parameters
@ -50,35 +50,35 @@ public class FlinkParameters extends AbstractParameters {
private String mainArgs;
/**
* slot个数
* slot count
*/
private int slot;
/**
*Yarn application的名字
*Yarn application name
*/
private String appName;
/**
* taskManager 数量
* taskManager count
*/
private int taskManager;
/**
* jobManagerMemory 内存大小
* job manager memory
*/
private String jobManagerMemory ;
/**
* taskManagerMemory内存大小
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
@ -207,16 +207,11 @@ public class FlinkParameters extends AbstractParameters {
@Override
public List<String> getResourceFilesList() {
if(resourceList != null ) {
List<String> resourceFiles = resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
if(mainJar != null) {
resourceFiles.add(mainJar.getRes());
}
return resourceFiles;
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return Collections.emptyList();
return resourceList;
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.task.http;
import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
@ -62,7 +63,7 @@ public class HttpParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java

@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.mr;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class MapreduceParameters extends AbstractParameters {
@ -54,7 +54,7 @@ public class MapreduceParameters extends AbstractParameters {
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* program type
@ -125,16 +125,12 @@ public class MapreduceParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if(resourceList != null ) {
List<String> resourceFiles = resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
if(mainJar != null) {
resourceFiles.add(mainJar.getRes());
}
return resourceFiles;
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return Collections.emptyList();
return resourceList;
}
@Override

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.procedure;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
@ -74,7 +75,7 @@ public class ProcedureParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
import java.util.stream.Collectors;
public class PythonParameters extends AbstractParameters {
/**
@ -56,12 +55,7 @@ public class PythonParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if (resourceList != null) {
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
}
return null;
public List<ResourceInfo> getResourceFilesList() {
return this.resourceList;
}
}

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java

@ -59,12 +59,7 @@ public class ShellParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
if (resourceList != null) {
return resourceList.stream()
.map(p -> p.getRes()).collect(Collectors.toList());
}
return null;
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}

16
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java

@ -19,9 +19,10 @@ package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* spark parameters
@ -77,7 +78,7 @@ public class SparkParameters extends AbstractParameters {
/**
* resource list
*/
private List<ResourceInfo> resourceList;
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
@ -218,15 +219,12 @@ public class SparkParameters extends AbstractParameters {
return mainJar != null && programType != null && sparkVersion != null;
}
@Override
public List<String> getResourceFilesList() {
if(resourceList !=null ) {
this.resourceList.add(mainJar);
return resourceList.stream()
.map(ResourceInfo::getRes).collect(Collectors.toList());
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return null;
return resourceList;
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.sql;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
@ -189,7 +190,7 @@ public class SqlParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.task.sqoop;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -111,7 +112,7 @@ public class SqoopParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java

@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.subprocess;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.ArrayList;
@ -42,7 +43,7 @@ public class SubProcessParameters extends AbstractParameters {
}
@Override
public List<String> getResourceFilesList() {
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java

@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class Stopper {
private static volatile AtomicBoolean signal = new AtomicBoolean(false);
private static AtomicBoolean signal = new AtomicBoolean(false);
public static final boolean isStopped(){
return signal.get();

49
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -26,6 +26,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
@ -415,6 +416,22 @@ public class HadoopUtils implements Closeable {
}
}
/**
* hdfs resource dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsDir = getHdfsUdfDir(tenantCode);
}
return hdfsDir;
}
/**
* hdfs resource dir
*
@ -450,22 +467,42 @@ public class HadoopUtils implements Closeable {
* get absolute path and name for file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
}
/**
* get absolute path and name for resource file on hdfs
*
* @param tenantCode tenant code
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
public static String getHdfsFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
public static String getHdfsResourceFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
}
/**
* get absolute path and name for udf file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for udf file on hdfs
*/
public static String getHdfsUdfFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename);
public static String getHdfsUdfFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
}
/**

14
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
@ -28,8 +29,7 @@ public class FlinkParametersTest {
@Test
public void getResourceFilesList() {
FlinkParameters flinkParameters = new FlinkParameters();
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
Assert.assertTrue(CollectionUtils.isEmpty(flinkParameters.getResourceFilesList()));
ResourceInfo mainResource = new ResourceInfo();
mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
@ -41,15 +41,17 @@ public class FlinkParametersTest {
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertEquals(2, flinkParameters.getResourceFilesList().size());
List<ResourceInfo> resourceFilesList = flinkParameters.getResourceFilesList();
Assert.assertNotNull(resourceFilesList);
Assert.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
resourceInfo2.setRes("testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);
Assert.assertNotNull(flinkParameters.getResourceFilesList());
Assert.assertEquals(3, flinkParameters.getResourceFilesList().size());
resourceFilesList = flinkParameters.getResourceFilesList();
Assert.assertNotNull(resourceFilesList);
Assert.assertEquals(3, resourceFilesList.size());
}
}

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java

@ -24,12 +24,6 @@ import java.util.List;
public class StringTest {
@Test
public void test1(){
System.out.println(String.format("%s_%010d_%010d", String.valueOf(1), Long.valueOf(3), Integer.valueOf(4)));
}
@Test
public void stringCompareTest(){

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -163,6 +163,11 @@ public class ProcessDefinition {
*/
private String modifyBy;
/**
* resource ids
*/
private String resourceIds;
public String getName() {
return name;
@ -334,6 +339,14 @@ public class ProcessDefinition {
this.scheduleReleaseState = scheduleReleaseState;
}
public String getResourceIds() {
return resourceIds;
}
public void setResourceIds(String resourceIds) {
this.resourceIds = resourceIds;
}
public int getTimeout() {
return timeout;
}
@ -393,6 +406,8 @@ public class ProcessDefinition {
", timeout=" + timeout +
", tenantId=" + tenantId +
", modifyBy='" + modifyBy + '\'' +
", resourceIds='" + resourceIds + '\'' +
'}';
}
}

68
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java

@ -32,11 +32,26 @@ public class Resource {
@TableId(value="id", type=IdType.AUTO)
private int id;
/**
* parent id
*/
private int pid;
/**
* resource alias
*/
private String alias;
/**
* full name
*/
private String fullName;
/**
* is directory
*/
private boolean isDirectory=false;
/**
* description
*/
@ -89,7 +104,15 @@ public class Resource {
this.updateTime = updateTime;
}
public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) {
this.id = id;
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
}
/*public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.alias = alias;
this.fileName = fileName;
this.description = description;
@ -98,6 +121,20 @@ public class Resource {
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}*/
public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
this.description = description;
this.fileName = fileName;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public int getId() {
@ -116,6 +153,30 @@ public class Resource {
this.alias = alias;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public boolean isDirectory() {
return isDirectory;
}
public void setDirectory(boolean directory) {
isDirectory = directory;
}
public String getFileName() {
return fileName;
}
@ -177,9 +238,12 @@ public class Resource {
public String toString() {
return "Resource{" +
"id=" + id +
", pid=" + pid +
", alias='" + alias + '\'' +
", fileName='" + fileName + '\'' +
", fullName='" + fullName + '\'' +
", isDirectory=" + isDirectory +
", description='" + description + '\'' +
", fileName='" + fileName + '\'' +
", userId=" + userId +
", type=" + type +
", size=" + size +

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -20,9 +20,11 @@ import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
/**
* process definition mapper interface
@ -83,7 +85,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
List<ProcessDefinition> queryDefinitionListByTenant(@Param("tenantId") int tenantId);
/**
* count process definition group by user
* count process definition group by user
* @param userId userId
* @param projectIds projectIds
* @param isAdmin isAdmin
@ -93,4 +95,11 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
@Param("userId") Integer userId,
@Param("projectIds") Integer[] projectIds,
@Param("isAdmin") boolean isAdmin);
/**
* list all resource ids
* @return resource ids list
*/
@MapKey("id")
List<Map<String, Object>> listResources();
}

53
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java

@ -30,12 +30,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
/**
* query resource list
* @param alias alias
* @param fullName full name
* @param userId userId
* @param type type
* @return resource list
*/
List<Resource> queryResourceList(@Param("alias") String alias,
List<Resource> queryResourceList(@Param("fullName") String fullName,
@Param("userId") int userId,
@Param("type") int type);
@ -59,6 +59,7 @@ public interface ResourceMapper extends BaseMapper<Resource> {
*/
IPage<Resource> queryResourcePaging(IPage<Resource> page,
@Param("userId") int userId,
@Param("id") int id,
@Param("type") int type,
@Param("searchVal") String searchVal);
@ -76,13 +77,13 @@ public interface ResourceMapper extends BaseMapper<Resource> {
*/
List<Resource> queryResourceExceptUserId(@Param("userId") int userId);
/**
* query tenant code by name
* @param resName resource name
* @param resType resource type
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType);
/**
* list authorized resource
@ -91,4 +92,48 @@ public interface ResourceMapper extends BaseMapper<Resource> {
* @return resource list
*/
<T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
/**
* list authorized resource
* @param userId userId
* @param resIds resource ids
* @return resource list
*/
<T> List<Resource> listAuthorizedResourceById(@Param("userId") int userId,@Param("resIds")T[] resIds);
/**
* delete resource by id array
* @param resIds resource id array
* @return delete num
*/
int deleteIds(@Param("resIds")Integer[] resIds);
/**
* list children
* @param direcotyId directory id
* @return resource id array
*/
List<Integer> listChildren(@Param("direcotyId") int direcotyId);
/**
* query resource by full name or pid
* @param fullName full name
* @param type resource type
* @return resource
*/
List<Resource> queryResource(@Param("fullName") String fullName,@Param("type") int type);
/**
* list resource by id array
* @param resIds resource id array
* @return resource list
*/
List<Resource> listResourceByIds(@Param("resIds")Integer[] resIds);
/**
* update resource
* @param resourceList resource list
* @return update num
*/
int batchUpdateResource(@Param("resourceList") List<Resource> resourceList);
}

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java

@ -86,4 +86,19 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
/**
* list UDF by resource id
* @param resourceIds resource id array
* @return UDF function list
*/
List<UdfFunc> listUdfByResourceId(@Param("resourceIds") int[] resourceIds);
/**
* list authorized UDF by resource id
* @param resourceIds resource id array
* @return UDF function list
*/
List<UdfFunc> listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds);
}

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -87,4 +87,11 @@
pd.user_id = u.id AND pd.project_id = p.id
AND pd.id = #{processDefineId}
</select>
<select id="listResources" resultType="java.util.HashMap">
SELECT id,resource_ids
FROM t_ds_process_definition
WHERE release_state = 1 and resource_ids is not null and resource_ids != ''
</select>
</mapper>

79
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml

@ -22,8 +22,8 @@
select *
from t_ds_resources
where 1= 1
<if test="alias != null and alias != ''">
and alias = #{alias}
<if test="fullName != null and fullName != ''">
and full_name = #{fullName}
</if>
<if test="type != -1">
and type = #{type}
@ -47,8 +47,8 @@
<select id="queryResourcePaging" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type=#{type}
<if test="userId != 0">
where type=#{type} and pid=#{id}
<if test="userId != 0 and id == -1">
and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
</if>
@ -70,7 +70,74 @@
<select id="queryTenantCodeByResourceName" resultType="java.lang.String">
select tenant_code
from t_ds_tenant t, t_ds_user u, t_ds_resources res
where t.id = u.tenant_id and u.id = res.user_id and res.type=0
and res.alias= #{resName}
where t.id = u.tenant_id and u.id = res.user_id and res.type=#{resType}
and res.full_name= #{resName}
</select>
<select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type=0
and id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resNames != null and resNames != ''">
and full_name in
<foreach collection="resNames" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="listAuthorizedResourceById" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where id in (select resources_id from t_ds_relation_resources_user where user_id=#{userId}
union select id as resources_id from t_ds_resources where user_id=#{userId})
<if test="resIds != null and resIds != ''">
and id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<delete id="deleteIds" parameterType="java.lang.Integer">
delete from t_ds_resources where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
<select id="listChildren" resultType="java.lang.Integer">
select id
from t_ds_resources
where pid = #{direcotyId}
</select>
<select id="queryResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where type = #{type}
and full_name = #{fullName}
</select>
<update id="batchUpdateResource" parameterType="java.util.List">
<foreach collection="resourceList" item="resource" index="index" open="" close="" separator =";">
update t_ds_resources
<set>
full_name=#{resource.fullName},
update_time=#{resource.updateTime}
</set>
<where>
id=#{resource.id}
</where>
</foreach>
</update>
<select id="listResourceByIds" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
select *
from t_ds_resources
where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
</mapper>

24
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -87,4 +87,28 @@
</foreach>
</if>
</select>
<select id="listUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where 1=1
<if test="resourceIds != null and resourceIds != ''">
and resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="listAuthorizedUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where
id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="resourceIds != null and resourceIds != ''">
and resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

57
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java

@ -34,6 +34,7 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@ -68,7 +69,10 @@ public class ResourceMapperTest {
private Resource insertOne(){
//insertOne
Resource resource = new Resource();
resource.setAlias("ut resource");
resource.setAlias("ut-resource");
resource.setFullName("/ut-resource");
resource.setPid(-1);
resource.setDirectory(false);
resource.setType(ResourceType.FILE);
resource.setUserId(111);
resourceMapper.insert(resource);
@ -80,16 +84,32 @@ public class ResourceMapperTest {
* @param user user
* @return Resource
*/
private Resource createResource(User user){
private Resource createResource(User user,boolean isDirectory,ResourceType resourceType,int pid,String alias,String fullName){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("ut resource %s",user.getUserName()));
resource.setType(ResourceType.FILE);
resource.setDirectory(isDirectory);
resource.setType(resourceType);
resource.setAlias(alias);
resource.setFullName(fullName);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user){
//insertOne
String alias = String.format("ut-resource-%s",user.getUserName());
String fullName = String.format("/%s",alias);
Resource resource = createResource(user, false, ResourceType.FILE, -1, alias, fullName);
return resource;
}
/**
* create user
* @return User
@ -200,13 +220,15 @@ public class ResourceMapperTest {
IPage<Resource> resourceIPage = resourceMapper.queryResourcePaging(
page,
resource.getUserId(),
0,
-1,
resource.getType().ordinal(),
""
);
IPage<Resource> resourceIPage1 = resourceMapper.queryResourcePaging(
page,
1110,
-1,
resource.getType().ordinal(),
""
);
@ -289,7 +311,7 @@ public class ResourceMapperTest {
resourceMapper.updateById(resource);
String resource1 = resourceMapper.queryTenantCodeByResourceName(
resource.getAlias()
resource.getFullName(),ResourceType.FILE.ordinal()
);
@ -305,22 +327,37 @@ public class ResourceMapperTest {
User generalUser2 = createGeneralUser("user2");
// create one resource
Resource resource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser1);
// need download resources
String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
String[] resNames = new String[]{resource.getFullName(), unauthorizedResource.getFullName()};
List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertEquals(generalUser2.getId(),resource.getUserId());
Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
Assert.assertFalse(resources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
// authorize object unauthorizedResource to generalUser
createResourcesUser(unauthorizedResource,generalUser2);
List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
Assert.assertTrue(authorizedResources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
}
@Test
public void deleteIdsTest(){
// create a general user
User generalUser1 = createGeneralUser("user1");
Resource resource = createResource(generalUser1);
Resource resource1 = createResource(generalUser1);
List<Integer> resourceList = new ArrayList<>();
resourceList.add(resource.getId());
resourceList.add(resource1.getId());
int result = resourceMapper.deleteIds(resourceList.toArray(new Integer[resourceList.size()]));
Assert.assertEquals(result,2);
}
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java

@ -95,7 +95,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
* set task instance state
* @return
*/
private Boolean setTaskInstanceState(){
private boolean setTaskInstanceState(){
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if(subProcessInstance == null || taskInstance.getState().typeIsFinished()){
return false;
@ -131,8 +131,8 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
if (taskInstance.getState().typeIsFinished()) {
logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",
this.taskInstance.getName(),
this.taskInstance.getState().toString(),
this.processInstance.getState().toString());
this.taskInstance.getState(),
this.processInstance.getState());
return;
}
while (Stopper.isRunning()) {

50
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -23,15 +23,18 @@ import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
@ -96,7 +99,7 @@ public class TaskScheduleThread implements Runnable {
TaskNode taskNode = JSON.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
List<ResourceInfo> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
downloadResource(
taskInstance.getExecutePath(),
@ -165,6 +168,7 @@ public class TaskScheduleThread implements Runnable {
new Date(),
taskInstance.getId());
}
/**
* get global paras map
* @return
@ -289,14 +293,16 @@ public class TaskScheduleThread implements Runnable {
/**
* create project resource files
*/
private List<String> createProjectResFiles(TaskNode taskNode) throws Exception{
private List<ResourceInfo> createProjectResFiles(TaskNode taskNode) throws Exception{
Set<String> projectFiles = new HashSet<>();
Set<ResourceInfo> projectFiles = new HashSet<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
List<String> projectResourceFiles = baseParam.getResourceFilesList();
projectFiles.addAll(projectResourceFiles);
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) {
projectFiles.addAll(projectResourceFiles);
}
}
return new ArrayList<>(projectFiles);
@ -309,18 +315,25 @@ public class TaskScheduleThread implements Runnable {
* @param projectRes
* @param logger
*/
private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
private void downloadResource(String execLocalPath, List<ResourceInfo> projectRes, Logger logger) throws Exception {
checkDownloadPermission(projectRes);
for (String res : projectRes) {
File resFile = new File(execLocalPath, res);
String resourceName;
for (ResourceInfo res : projectRes) {
if (res.getId() != 0) {
Resource resource = processService.getResourceById(res.getId());
resourceName = resource.getFullName();
}else{
resourceName = res.getRes();
}
File resFile = new File(execLocalPath, resourceName);
if (!resFile.exists()) {
try {
// query the tenant code of the resource according to the name of the resource
String tentnCode = processService.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res);
String tentnCode = processService.queryTenantCodeByResName(resourceName, ResourceType.FILE);
String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, resourceName);
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resourceName, false, true);
}catch (Exception e){
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
@ -336,10 +349,17 @@ public class TaskScheduleThread implements Runnable {
* @param projectRes resource name list
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
private void checkDownloadPermission(List<ResourceInfo> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
if (projectRes.stream().allMatch(t->t.getId() == 0)) {
String[] resNames = projectRes.stream().map(t -> t.getRes()).collect(Collectors.toList()).toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_NAME,processService,resNames,userId,logger);
permissionCheck.checkPermission();
}else{
Integer[] resIds = projectRes.stream().map(t -> t.getId()).collect(Collectors.toList()).toArray(new Integer[projectRes.size()]);
PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resIds,userId,logger);
permissionCheck.checkPermission();
}
}
}

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java

@ -94,4 +94,9 @@ public abstract class AbstractYarnTask extends AbstractTask {
* @throws Exception exception
*/
protected abstract String buildCommand() throws Exception;
/**
* set main jar name
*/
protected abstract void setMainJarName();
}

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@ -63,6 +65,7 @@ public class FlinkTask extends AbstractYarnTask {
if (!flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
setMainJarName();
flinkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
@ -111,6 +114,28 @@ public class FlinkTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -64,7 +66,7 @@ public class MapReduceTask extends AbstractYarnTask {
if (!mapreduceParameters.checkParameters()) {
throw new RuntimeException("mapreduce task params is not valid");
}
setMainJarName();
mapreduceParameters.setQueue(taskProps.getQueue());
// replace placeholder
@ -99,6 +101,28 @@ public class MapReduceTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = mapreduceParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(mapreduceParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
mapreduceParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return mapreduceParameters;

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -143,13 +143,16 @@ public class ShellTask extends AbstractTask {
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if(paramsMap != null && taskProps.getScheduleTime()!=null) {
String dateTime = DateUtils.format(taskProps.getScheduleTime(), Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
// new
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (paramsMap != null) {
if (taskProps.getScheduleTime() != null) {
String dateTime = DateUtils.format(taskProps.getScheduleTime(), Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
}
script = ParameterUtils.convertParameterPlaceholders2(script, ParamUtils.convert(paramsMap));
}

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -18,11 +18,13 @@ package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@ -67,8 +69,8 @@ public class SparkTask extends AbstractYarnTask {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
setMainJarName();
sparkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
@ -115,6 +117,28 @@ public class SparkTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -378,7 +378,7 @@ public class SqlTask extends AbstractTask {
List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
// receiving group list
List<String> receviersList = new ArrayList<String>();
List<String> receviersList = new ArrayList<>();
for(User user:users){
receviersList.add(user.getEmail().trim());
}
@ -392,7 +392,7 @@ public class SqlTask extends AbstractTask {
}
// copy list
List<String> receviersCcList = new ArrayList<String>();
List<String> receviersCcList = new ArrayList<>();
// Custom Copier
String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){
@ -406,7 +406,7 @@ public class SqlTask extends AbstractTask {
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(STATUS)){
if(!(boolean) mailResult.get(STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{
@ -463,22 +463,7 @@ public class SqlTask extends AbstractTask {
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF, processService,udfFunIds,userId,logger);
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<>(AuthorizationType.UDF, processService,udfFunIds,userId,logger);
permissionCheckUdf.checkPermission();
}
/**
* check data source permission
* @param dataSourceId data source id
* @return if has download permission return true else false
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new Integer[]{dataSourceId},userId,logger);
permissionCheckDataSource.checkPermission();
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java

@ -71,6 +71,10 @@ public class SqoopTask extends AbstractYarnTask {
return null;
}
@Override
protected void setMainJarName() {
}
@Override
public AbstractParameters getParameters() {
return sqoopParameters;

60
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -25,13 +25,10 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
@ -45,6 +42,7 @@ import java.util.Date;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(OSUtils.class)
@PowerMockIgnore({"javax.management.*"})
public class ShellTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
@ -136,6 +134,28 @@ public class ShellTaskTest {
}
}
@Test
public void testInitException() {
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"rawScript\": \"\"}");
ShellTask shellTask = new ShellTask(props, logger);
try {
shellTask.init();
} catch (Exception e) {
logger.info(e.getMessage(), e);
if (e.getMessage().contains("shell task params is not valid")) {
Assert.assertTrue(true);
}
}
}
/**
* Method: init for Windows
*/
@ -157,7 +177,20 @@ public class ShellTaskTest {
public void testHandleForUnix() throws Exception {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
shellTask.handle();
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(props, logger);
shellTask1.init();
shellTask1.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")
@ -174,7 +207,20 @@ public class ShellTaskTest {
public void testHandleForWindows() throws Exception {
try {
Assume.assumeTrue(OSUtils.isWindows());
shellTask.handle();
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(props, logger);
shellTask1.init();
shellTask1.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")) {

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

@ -135,7 +135,7 @@ public class SparkTaskTest {
logger.info("spark task command : {}", sparkArgs);
Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND );
Assert.assertEquals(SPARK2_COMMAND, sparkArgs.split(" ")[0]);
}
}

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -45,6 +46,11 @@ public class PermissionCheck<T> {
*/
private T[] needChecks;
/**
* resoruce info
*/
private List<ResourceInfo> resourceList;
/**
* user id
*/
@ -90,6 +96,22 @@ public class PermissionCheck<T> {
this.logger = logger;
}
/**
* permission check
* @param logger
* @param authorizationType
* @param processService
* @param resourceList
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
this.authorizationType = authorizationType;
this.processService = processService;
this.resourceList = resourceList;
this.userId = userId;
this.logger = logger;
}
public AuthorizationType getAuthorizationType() {
return authorizationType;
}
@ -122,6 +144,14 @@ public class PermissionCheck<T> {
this.userId = userId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
/**
* has permission
* @return true if has permission
@ -141,6 +171,7 @@ public class PermissionCheck<T> {
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){

26
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1556,10 +1556,11 @@ public class ProcessService {
/**
* find tenant code by resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
public String queryTenantCodeByResName(String resName){
return resourceMapper.queryTenantCodeByResourceName(resName);
public String queryTenantCodeByResName(String resName,ResourceType resourceType){
return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal());
}
/**
@ -1791,10 +1792,18 @@ public class ProcessService {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){
case RESOURCE_FILE:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
case RESOURCE_FILE_ID:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedResourceFiles);
break;
case RESOURCE_FILE_NAME:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case UDF_FILE:
Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfFiles);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources);
@ -1820,5 +1829,14 @@ public class ProcessService {
return userMapper.queryDetailsById(userId);
}
/**
* get resource by resoruce id
* @param resoruceId resource id
* @return Resource
*/
public Resource getResourceById(int resoruceId){
return resourceMapper.selectById(resoruceId);
}
}

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js

@ -724,7 +724,7 @@ JSP.prototype.handleEvent = function () {
} else {
$(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1)
}
// Storage node dependency information
saveTargetarr(sourceId, targetId)

0
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js

0
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue

2
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue

@ -116,4 +116,4 @@
},
components: { mPopup, mListBoxF }
}
</script>
</script>

0
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue

0
dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js

0
dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue

2
sql/soft_version

@ -1 +1 @@
1.2.0
1.2.2

82
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -74,4 +74,84 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_C_app_link;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
-- ac_dolphin_T_t_ds_resources_A_pid
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_pid;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_pid()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='pid')
THEN
ALTER TABLE t_ds_resources ADD `pid` int(11) DEFAULT -1 COMMENT 'parent id';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_A_pid;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_pid;
-- ac_dolphin_T_t_ds_resources_A_full_name
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_full_name;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='full_name')
THEN
ALTER TABLE t_ds_resources ADD `full_name` varchar(255) DEFAULT NULL COMMENT 'full name';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_A_full_name;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name;
-- ac_dolphin_T_t_ds_resources_A_pid
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_is_directory;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_resources_is_directory()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_resources'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='is_directory')
THEN
ALTER TABLE t_ds_resources ADD `is_directory` tinyint(1) DEFAULT 0 COMMENT 'is directory';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_resources_is_directory;
DROP PROCEDURE ac_dolphin_T_t_ds_resources_is_directory;
-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='resource_ids')
THEN
ALTER TABLE t_ds_process_definition ADD `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_process_definition_A_resource_ids;
DROP PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids;

85
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -66,4 +66,87 @@ d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_C_app_link();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
-- ac_dolphin_T_t_ds_resources_A_pid
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_pid() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='pid')
THEN
ALTER TABLE t_ds_resources ADD COLUMN pid int DEFAULT -1;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_pid();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_pid();
-- ac_dolphin_T_t_ds_resources_A_full_name
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_full_name();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_full_name() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='full_name')
THEN
ALTER TABLE t_ds_resources ADD COLUMN full_name varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_full_name();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_full_name();
-- ac_dolphin_T_t_ds_resources_A_is_directory
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_is_directory();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_resources'
AND COLUMN_NAME ='is_directory')
THEN
ALTER TABLE t_ds_resources ADD COLUMN is_directory boolean DEFAULT false;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_resources_A_is_directory();
DROP FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory();
-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_definition'
AND COLUMN_NAME ='resource_ids')
THEN
ALTER TABLE t_ds_process_definition ADD COLUMN resource_ids varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_process_definition_A_resource_ids();
DROP FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids();

4
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql

@ -13,4 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
Loading…
Cancel
Save