Browse Source

[Bug] [Resource] fix resource delete bug (#15003)

augit-log
liyou 8 months ago committed by GitHub
parent
commit
d6a900667a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  2. 168
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 119
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  5. 9
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  7. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  8. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  9. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  10. 20
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -22,11 +22,13 @@ import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -255,4 +257,13 @@ public interface TaskDefinitionService {
ReleaseState releaseState); ReleaseState releaseState);
void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion); void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion);
/**
* queryAll TaskDefinition
*
* @param releaseState workFlow releaseState
* @param flag task flag
* @return TaskDefinition list
*/
List<TaskDefinition> queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState releaseState, Flag flag);
} }

168
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -254,14 +254,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* create process definition * create process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param name process definition name * @param name process definition name
* @param description description * @param description description
* @param globalParams global params * @param globalParams global params
* @param locations locations for nodes * @param locations locations for nodes
* @param timeout timeout * @param timeout timeout
* @param taskRelationJson relation json for nodes * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson * @param taskDefinitionJson taskDefinitionJson
* @return create result code * @return create result code
*/ */
@ -342,7 +342,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* create single process definition * create single process definition
* *
* @param loginUser login user * @param loginUser login user
* @param workflowCreateRequest the new workflow object will be created * @param workflowCreateRequest the new workflow object will be created
* @return New ProcessDefinition object created just now * @return New ProcessDefinition object created just now
*/ */
@ -493,7 +493,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query process definition list * query process definition list
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @return definition list * @return definition list
*/ */
@ -516,7 +516,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query process definition simple list * query process definition simple list
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @return definition simple list * @return definition simple list
*/ */
@ -547,12 +547,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query process definition list paging * query process definition list paging
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param searchVal search value * @param searchVal search value
* @param userId user id * @param userId user id
* @param pageNo page number * @param pageNo page number
* @param pageSize page size * @param pageSize page size
* @return process definition page * @return process definition page
*/ */
@Override @Override
@ -602,7 +602,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* Filter resource process definitions * Filter resource process definitions
* *
* @param loginUser login user * @param loginUser login user
* @param workflowFilterRequest workflow filter requests * @param workflowFilterRequest workflow filter requests
* @return List process definition * @return List process definition
*/ */
@ -642,9 +642,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query detail of process definition * query detail of process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @return process definition detail * @return process definition detail
*/ */
@Override @Override
@ -673,7 +673,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* query detail of process definition * query detail of process definition
* *
* @param loginUser login user * @param loginUser login user
* @param code process definition code * @param code process definition code
* @return process definition detail * @return process definition detail
*/ */
@Override @Override
@ -734,17 +734,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* update process definition * update process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param name process definition name * @param name process definition name
* @param code process definition code * @param code process definition code
* @param description description * @param description description
* @param globalParams global params * @param globalParams global params
* @param locations locations for nodes * @param locations locations for nodes
* @param timeout timeout * @param timeout timeout
* @param taskRelationJson relation json for nodes * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson * @param taskDefinitionJson taskDefinitionJson
* @param otherParamsJson otherParamsJson handle other params * @param otherParamsJson otherParamsJson handle other params
* @return update result code * @return update result code
*/ */
@Override @Override
@ -811,11 +811,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* Task want to delete whether used in other task, should throw exception when have be used. * Task want to delete whether used in other task, should throw exception when have be used.
* * <p>
* This function avoid delete task already dependencies by other tasks by accident. * This function avoid delete task already dependencies by other tasks by accident.
* *
* @param processDefinition ProcessDefinition you change task definition and task relation * @param processDefinition ProcessDefinition you change task definition and task relation
* @param taskRelationList All the latest task relation list from process definition * @param taskRelationList All the latest task relation list from process definition
*/ */
private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> taskRelationList) { List<ProcessTaskRelationLog> taskRelationList) {
@ -923,9 +923,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* verify process definition name unique * verify process definition name unique
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param name name * @param name name
* @return true if process definition name not exists, otherwise false * @return true if process definition name not exists, otherwise false
*/ */
@Override @Override
@ -995,7 +995,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* Process definition want to delete whether used in other task, should throw exception when have be used. * Process definition want to delete whether used in other task, should throw exception when have be used.
* * <p>
* This function avoid delete process definition already dependencies by other tasks by accident. * This function avoid delete process definition already dependencies by other tasks by accident.
* *
* @param processDefinition ProcessDefinition you change task definition and task relation * @param processDefinition ProcessDefinition you change task definition and task relation
@ -1076,9 +1076,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* release process definition: online / offline * release process definition: online / offline
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @param releaseState release state * @param releaseState release state
* @return release result code * @return release result code
*/ */
@ -1243,9 +1243,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* import process definition * import process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param file process metadata json file * @param file process metadata json file
* @return import process * @return import process
*/ */
@Override @Override
@ -1717,9 +1717,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* get task node details based on process definition * get task node details based on process definition
* *
* @param loginUser loginUser * @param loginUser loginUser
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @return task node list * @return task node list
*/ */
@Override @Override
@ -1746,9 +1746,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* get task node details map based on process definition * get task node details map based on process definition
* *
* @param loginUser loginUser * @param loginUser loginUser
* @param projectCode project code * @param projectCode project code
* @param codes define codes * @param codes define codes
* @return task node list * @return task node list
*/ */
@Override @Override
@ -1799,7 +1799,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query process definition all by project code * query process definition all by project code
* *
* @param loginUser loginUser * @param loginUser loginUser
* @param projectCode project code * @param projectCode project code
* @return process definitions in the project * @return process definitions in the project
*/ */
@ -1839,7 +1839,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query process definition list by process definition code * query process definition list by process definition code
* *
* @param projectCode project code * @param projectCode project code
* @param processDefinitionCode process definition code * @param processDefinitionCode process definition code
* @return task definition list in the process definition * @return task definition list in the process definition
*/ */
@ -1875,8 +1875,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* Encapsulates the TreeView structure * Encapsulates the TreeView structure
* *
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @param limit limit * @param limit limit
* @return tree view json data * @return tree view json data
*/ */
@Override @Override
@ -2038,9 +2038,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* batch copy process definition * batch copy process definition
* *
* @param loginUser loginUser * @param loginUser loginUser
* @param projectCode projectCode * @param projectCode projectCode
* @param codes processDefinitionCodes * @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode * @param targetProjectCode targetProjectCode
*/ */
@Override @Override
@ -2062,9 +2062,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* batch move process definition * batch move process definition
* Will be deleted * Will be deleted
* @param loginUser loginUser *
* @param projectCode projectCode * @param loginUser loginUser
* @param codes processDefinitionCodes * @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode * @param targetProjectCode targetProjectCode
*/ */
@Override @Override
@ -2238,8 +2239,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* get new Task name or Process name when copy or import operate * get new Task name or Process name when copy or import operate
*
* @param originalName Task or Process original name * @param originalName Task or Process original name
* @param suffix "_copy_" or "_import_" * @param suffix "_copy_" or "_import_"
* @return new name * @return new name
*/ */
public String getNewName(String originalName, String suffix) { public String getNewName(String originalName, String suffix) {
@ -2261,10 +2263,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* switch the defined process definition version * switch the defined process definition version
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @param version the version user want to switch * @param version the version user want to switch
* @return switch process definition version result code * @return switch process definition version result code
*/ */
@Override @Override
@ -2315,11 +2317,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* check batch operate result * check batch operate result
* *
* @param srcProjectCode srcProjectCode * @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode * @param targetProjectCode targetProjectCode
* @param result result * @param result result
* @param failedProcessList failedProcessList * @param failedProcessList failedProcessList
* @param isCopy isCopy * @param isCopy isCopy
*/ */
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode, private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) { Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@ -2346,11 +2348,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* query the pagination versions info by one certain process definition code * query the pagination versions info by one certain process definition code
* *
* @param loginUser login user info to check auth * @param loginUser login user info to check auth
* @param projectCode project code * @param projectCode project code
* @param pageNo page number * @param pageNo page number
* @param pageSize page size * @param pageSize page size
* @param code process definition code * @param code process definition code
* @return the pagination process definition versions info of the certain process definition * @return the pagination process definition versions info of the certain process definition
*/ */
@Override @Override
@ -2382,10 +2384,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* delete one certain process definition by version number and process definition code * delete one certain process definition by version number and process definition code
* *
* @param loginUser login user info to check auth * @param loginUser login user info to check auth
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @param version version number * @param version version number
* @return delete result code * @return delete result code
*/ */
@Override @Override
@ -2564,16 +2566,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* update process definition basic info * update process definition basic info
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param name process definition name * @param name process definition name
* @param code process definition code * @param code process definition code
* @param description description * @param description description
* @param globalParams globalParams * @param globalParams globalParams
* @param timeout timeout * @param timeout timeout
* @param scheduleJson scheduleJson * @param scheduleJson scheduleJson
* @param otherParamsJson otherParamsJson handle other params * @param otherParamsJson otherParamsJson handle other params
* @param executionType executionType * @param executionType executionType
* @return update result code * @return update result code
*/ */
@Override @Override
@ -2679,9 +2681,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* update single resource workflow * update single resource workflow
* *
* @param loginUser login user * @param loginUser login user
* @param workflowCode workflow resource code want to update * @param workflowCode workflow resource code want to update
* @param workflowUpdateRequest workflow update resource object * @param workflowUpdateRequest workflow update resource object
* @return Process definition * @return Process definition
*/ */
@Override @Override
@ -2925,6 +2927,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* save other relation * save other relation
*
* @param loginUser * @param loginUser
* @param processDefinition * @param processDefinition
* @param result * @param result
@ -2938,6 +2941,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* get Json String * get Json String
*
* @param loginUser * @param loginUser
* @param processDefinition * @param processDefinition
* @return Json String * @return Json String
@ -2949,7 +2953,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* view process variables * view process variables
* @param loginUser login user *
* @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @return variables data * @return variables data
@ -3032,6 +3037,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/** /**
* delete other relation * delete other relation
*
* @param project * @param project
* @param result * @param result
* @param processDefinition * @param processDefinition

119
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -35,22 +35,24 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics; import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
@ -118,10 +120,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
private ResourceUserMapper resourceUserMapper; private ResourceUserMapper resourceUserMapper;
@Autowired @Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper; private TaskDefinitionService taskDefinitionService;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired(required = false) @Autowired(required = false)
private StorageOperate storageOperate; private StorageOperate storageOperate;
@ -129,11 +128,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* create directory * create directory
* *
* @param loginUser login user * @param loginUser login user
* @param name alias * @param name alias
* @param type type * @param type type
* @param pid parent id * @param pid parent id
* @param currentDir current directory * @param currentDir current directory
* @return create directory result * @return create directory result
*/ */
@Override @Override
@ -271,7 +270,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* update the folder's size of the resource * update the folder's size of the resource
* *
* @param resource the current resource * @param resource the current resource
* @param size size * @param size size
*/ */
private void updateParentResourceSize(Resource resource, long size) { private void updateParentResourceSize(Resource resource, long size) {
if (resource.getSize() > 0) { if (resource.getSize() > 0) {
@ -316,13 +315,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* update resource * update resource
* *
* @param loginUser login user * @param loginUser login user
* @param resourceFullName resource full name * @param resourceFullName resource full name
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @param name name * @param name name
* @param type resource type * @param type resource type
* @param file resource file * @param file resource file
* @return update result code * @return update result code
*/ */
@Override @Override
@ -410,6 +409,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// if name unchanged, return directly without moving on HDFS // if name unchanged, return directly without moving on HDFS
if (originResourceName.equals(name) && file == null) { if (originResourceName.equals(name) && file == null) {
return result; return result;
} else {
result = verifyResourceExistsOnlineUsage(resourceFullName);
if (result.isFailed()) {
return result;
}
} }
if (file != null) { if (file != null) {
@ -509,14 +513,14 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* query resources list paging * query resources list paging
* *
* @param loginUser login user * @param loginUser login user
* @param fullName resource full name * @param fullName resource full name
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @param type resource type * @param type resource type
* @param searchVal search value * @param searchVal search value
* @param pageNo page number * @param pageNo page number
* @param pageSize page size * @param pageSize page size
* @return resource list page * @return resource list page
*/ */
@Override @Override
@ -818,7 +822,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* transform resource object into StorageEntity object * transform resource object into StorageEntity object
* *
* @param resource a resource object * @param resource a resource object
* @return a storageEntity object * @return a storageEntity object
*/ */
private StorageEntity createStorageEntityBasedOnResource(Resource resource) { private StorageEntity createStorageEntityBasedOnResource(Resource resource) {
@ -837,8 +841,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* delete resource * delete resource
* *
* @param loginUser login user * @param loginUser login user
* @param fullName resource full name * @param fullName resource full name
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @return delete result code * @return delete result code
@ -899,14 +903,41 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
} }
} }
result = verifyResourceExistsOnlineUsage(fullName);
if (result.isFailed()) {
return result;
}
// delete file on hdfs,S3 // delete file on hdfs,S3
storageOperate.delete(fullName, allChildren, true); storageOperate.delete(fullName, allChildren, true);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
/**
* validate whether usage resource
*
* @param fullName fullName
* @return {@link Result}
*/
private Result verifyResourceExistsOnlineUsage(String fullName) {
Result<Object> result = new Result<>();
putMsg(result, Status.SUCCESS);
// query all online work flow task
List<TaskDefinition> taskDefinitions = taskDefinitionService
.queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState.ONLINE, Flag.YES);
if (CollectionUtils.isNotEmpty(taskDefinitions)) {
for (TaskDefinition taskDefinition : taskDefinitions) {
if (taskDefinition.getTaskParamResourceNames().contains(fullName)) {
putMsg(result, Status.RESOURCE_IS_USED);
break;
}
}
}
return result;
}
private String RemoveResourceFromResourceList(String stringToDelete, String taskParameter, boolean isDir) { private String RemoveResourceFromResourceList(String stringToDelete, String taskParameter, boolean isDir) {
Map<String, Object> taskParameters = JSONUtils.parseObject( Map<String, Object> taskParameters = JSONUtils.parseObject(
taskParameter, taskParameter,
@ -959,8 +990,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* verify resource by full name or pid and type * verify resource by full name or pid and type
* *
* @param fileName resource file name * @param fileName resource file name
* @param type resource type * @param type resource type
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @return true if the resource full name or pid not exists, otherwise return false * @return true if the resource full name or pid not exists, otherwise return false
@ -1010,7 +1041,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* get resource by id * get resource by id
* @param fullName resource full name *
* @param fullName resource full name
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @return resource * @return resource
@ -1057,10 +1089,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* view resource file online * view resource file online
* *
* @param fullName resource fullName * @param fullName resource fullName
* @param resTenantCode owner's tenant code of the resource * @param resTenantCode owner's tenant code of the resource
* @param skipLineNum skip line number * @param skipLineNum skip line number
* @param limit limit * @param limit limit
* @return resource content * @return resource content
*/ */
@Override @Override
@ -1253,10 +1285,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* updateProcessInstance resource * updateProcessInstance resource
* *
* @param fullName resource full name * @param fullName resource full name
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @param content content * @param content content
* @return update result cod * @return update result cod
*/ */
@Override @Override
@ -1317,9 +1349,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
} }
/** /**
* @param fullName resource full name * @param fullName resource full name
* @param tenantCode tenant code * @param tenantCode tenant code
* @param content content * @param content content
* @return result * @return result
*/ */
private Result<Object> uploadContentToStorage(User loginUser, String fullName, String tenantCode, String content) { private Result<Object> uploadContentToStorage(User loginUser, String fullName, String tenantCode, String content) {
@ -1365,6 +1397,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* download file * download file
*
* @return resource content * @return resource content
*/ */
@Override @Override
@ -1751,10 +1784,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/** /**
* check permission by comparing login user's tenantCode with tenantCode in the request * check permission by comparing login user's tenantCode with tenantCode in the request
* *
* @param isAdmin is the login user admin * @param isAdmin is the login user admin
* @param userTenantCode loginUser's tenantCode * @param userTenantCode loginUser's tenantCode
* @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource, * @param resTenantCode tenantCode in the request field "resTenantCode" for tenant code owning the resource,
* can be different from the login user in the case of logging in as admin users. * can be different from the login user in the case of logging in as admin users.
* @return isValid * @return isValid
*/ */
private boolean isUserTenantValid(boolean isAdmin, String userTenantCode, private boolean isUserTenantValid(boolean isAdmin, String userTenantCode,

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -1326,4 +1326,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
// delete task workflow relation // delete task workflow relation
processTaskRelationService.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion); processTaskRelationService.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion);
} }
@Override
public List<TaskDefinition> queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState releaseState,
Flag flag) {
return taskDefinitionMapper.queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(releaseState, flag);
}
} }

9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -28,14 +28,16 @@ import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
@ -109,7 +111,7 @@ public class ResourcesServiceTest {
private UdfFuncMapper udfFunctionMapper; private UdfFuncMapper udfFunctionMapper;
@Mock @Mock
private ProcessDefinitionMapper processDefinitionMapper; private TaskDefinitionService taskDefinitionService;
@Mock @Mock
private ResourceUserMapper resourceUserMapper; private ResourceUserMapper resourceUserMapper;
@ -407,6 +409,9 @@ public class ResourcesServiceTest {
Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest", Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest",
null, "123", null)) null, "123", null))
.thenReturn(getStorageEntityResource()); .thenReturn(getStorageEntityResource());
Mockito.when(taskDefinitionService.queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState.OFFLINE,
Flag.YES)).thenReturn(Arrays.asList(new TaskDefinition()));
Result result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123"); Result result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123");
logger.info(result.toString()); logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg()); Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -762,4 +762,15 @@ public class TaskDefinitionServiceImplTest {
return processTaskRelationList; return processTaskRelationList;
} }
@Test
public void testQueryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag() {
List<TaskDefinition> tasks = Arrays.asList(new TaskDefinition(), new TaskDefinition());
Mockito.when(taskDefinitionMapper.queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState.OFFLINE,
null)).thenReturn(tasks);
List<TaskDefinition> taskDefinitions = taskDefinitionService
.queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(ReleaseState.OFFLINE, null);
Assertions.assertEquals(tasks, taskDefinitions);
}
} }

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -697,7 +697,7 @@ public final class Constants {
* schedule timezone * schedule timezone
*/ */
public static final String SCHEDULE_TIMEZONE = "schedule_timezone"; public static final String SCHEDULE_TIMEZONE = "schedule_timezone";
public static final int RESOURCE_FULL_NAME_MAX_LENGTH = 128; public static final int RESOURCE_FULL_NAME_MAX_LENGTH = 255;
/** /**
* tenant * tenant

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -27,7 +27,9 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -259,6 +261,21 @@ public class TaskDefinition {
return taskParamMap; return taskParamMap;
} }
/**
* get task param resource names
*
* @return {@link List}<{@link String}>
*/
public List<String> getTaskParamResourceNames() {
if (StringUtils.isNotEmpty(taskParams)) {
JsonNode resourceList = JSONUtils.parseObject(taskParams).findValue("resourceList");
if (resourceList != null && !resourceList.isNull()) {
return resourceList.findValuesAsText("resourceName");
}
}
return Collections.EMPTY_LIST;
}
public String getDependence() { public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
} }

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser; import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -158,4 +160,14 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList); int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList);
void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion); void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion);
/**
* queryAll TaskDefinition
*
* @param releaseState release state
* @param flag task flag
* @return task list
*/
List<TaskDefinition> queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag(@Param("releaseState") ReleaseState releaseState,
@Param("taskFlag") Flag flag);
} }

20
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -170,4 +170,24 @@
</where> </where>
order by update_time desc, id asc order by update_time desc, id asc
</select> </select>
<select id="queryAllTaskDefinitionByWorkFlowReleaseStateAndTaskFlag" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
SELECT
<include refid="baseSqlV2">
<property name="alias" value="td"/>
</include>
FROM t_ds_process_definition as pd
JOIN t_ds_process_task_relation as ptr on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
JOIN t_ds_task_definition as td on ptr.post_task_code = td.code and ptr.post_task_version = td.version
<where>
<if test="releaseState != null">
and pd.release_state = #{releaseState}
</if>
<if test="taskFlag != null">
and td.flag = #{taskFlag}
</if>
</where>
</select>
</mapper> </mapper>

Loading…
Cancel
Save