From 8895921d87ad34d5dc98f69ab024a28a1e6b1598 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Thu, 23 Mar 2023 22:52:23 +0800 Subject: [PATCH] [Improvement-13651] Remove useless resources_task (#13721) --- .../service/impl/ResourcesServiceImpl.java | 217 ------------------ .../dao/entity/ResourcesTask.java | 56 ----- .../dao/mapper/ResourceTaskMapper.java | 45 ---- .../dao/mapper/ResourceTaskMapper.xml | 58 ----- .../resources/sql/dolphinscheduler_h2.sql | 18 -- .../resources/sql/dolphinscheduler_mysql.sql | 17 -- .../sql/dolphinscheduler_postgresql.sql | 13 -- .../mysql/dolphinscheduler_ddl.sql | 1 - .../service/process/ProcessServiceImpl.java | 80 +------ .../service/process/ProcessServiceTest.java | 6 +- 10 files changed, 4 insertions(+), 507 deletions(-) delete mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java delete mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java delete mode 100644 dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 76c14333eb..cb279395c6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.RegexUtils; @@ -44,27 +43,20 @@ import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Resource; -import org.apache.dolphinscheduler.dao.entity.ResourcesTask; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; -import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections4.CollectionUtils; @@ -110,21 +102,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe @Autowired private ResourceMapper resourcesMapper; - @Autowired - private ResourceTaskMapper resourceTaskMapper; - - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; - @Autowired private UdfFuncMapper udfFunctionMapper; - @Autowired - private ProcessService processService; - - @Autowired - private ProcessDefinitionService processDefinitionService; - @Autowired private TenantMapper tenantMapper; @@ -475,7 +455,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe } Date now = new Date(); - long originFileSize = resource.getSize(); resource.setAlias(name); resource.setFileName(name); @@ -491,87 +470,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } - List existResourcesList; - if (resource.isDirectory()) { - existResourcesList = resourceTaskMapper.selectSubfoldersFullNames(originFullName + FOLDER_SEPARATOR); - } else { - existResourcesList = resourceTaskMapper.selectByMap( - Collections.singletonMap("full_name", originFullName)); - } - - if (existResourcesList.size() > 0 && !fullName.equals(originFullName)) { - // check if any related task is online. If it is, it can not be updated. - for (ResourcesTask existResource : existResourcesList) { - int taskId = existResource.getTaskId(); - if (processService.isTaskOnline(taskDefinitionMapper.selectById(taskId).getCode())) { - log.error("can't be updated,because it is used of process definition that's online"); - log.error("resource task relation id:{} is used of task code {}", existResource.getId(), - taskDefinitionMapper.selectById(taskId).getCode()); - putMsg(result, Status.RESOURCE_IS_USED); - return result; - } - } - - for (ResourcesTask existResource : existResourcesList) { - int taskId = existResource.getTaskId(); - long taskCode = taskDefinitionMapper.selectById(taskId).getCode(); - - List processTaskRelation = processTaskRelationMapper.selectByMap( - Collections.singletonMap("post_task_code", taskCode)); - if (processTaskRelation.size() > 0) { - long processDefinitionCode = processTaskRelation.get(0).getProcessDefinitionCode(); - int processDefinitionVersion = processTaskRelation.get(0).getProcessDefinitionVersion(); - List taskRelationList = processTaskRelationMapper.queryByProcessCode( - processTaskRelation.get(0).getProjectCode(), - processDefinitionCode); - - List taskDefinitionLogList = new ArrayList<>(); - - if (taskRelationList.size() > 0) { - ProcessDefinitionLog processDefinition = - processDefinitionLogMapper.queryByDefinitionCodeAndVersion( - processDefinitionCode, processDefinitionVersion); - for (ProcessTaskRelation taskRelation : taskRelationList) { - long taskCodeInProcess = taskRelation.getPostTaskCode(); - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCodeInProcess); - if (taskCodeInProcess == taskCode) { - // originFullName is a prefix if isDirectory is true - taskDefinition.setTaskParams(RemoveResourceFromResourceList(originFullName, - taskDefinition.getTaskParams(), - resource.isDirectory())); - // if isDirectory is true, fullName is the new prefix. we replace old prefix - // of resource fullname with the new prefix. - // if isDirectory is false, fullName is the new path. - taskDefinition.setTaskParams(AddResourceToResourceList(originFullName, - fullName, - existResource.getFullName(), - taskDefinition.getTaskParams(), - resource.isDirectory())); - } - taskDefinitionLogList.add(taskDefinition); - } - - // update workflow & task definition associated to the resource - if (processDefinition != null) { - processDefinitionService.updateProcessDefinition(loginUser, - processDefinition.getProjectCode(), - processDefinition.getName(), - processDefinition.getCode(), - processDefinition.getDescription(), - processDefinition.getGlobalParams(), - processDefinition.getLocations(), - processDefinition.getTimeout(), - tenantCode, - JSONUtils.toJsonString(taskRelationList.toArray()), - JSONUtils.toJsonString(taskDefinitionLogList.toArray()), - "", - processDefinition.getExecutionType()); - } - } - } - } - } - if (file != null) { // fail upload if (!upload(loginUser, fullName, file, type)) { @@ -1049,15 +947,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe resTenantCode, resource.getType()).stream().map(storageEntity -> storageEntity.getFullName()) .collect(Collectors.toList()); - Set resourcesNeedToDeleteSet = new HashSet<>(); String[] allChildrenFullNameArray = allChildren.stream().toArray(String[]::new); - // check before using allChildrenFullNameArray to query full names. - if (allChildrenFullNameArray.length != 0) { - resourcesNeedToDeleteSet.addAll( - resourceTaskMapper.selectBatchFullNames(allChildrenFullNameArray)); - } - // if resource type is UDF,need check whether it is bound by UDF function if (resource.getType() == (ResourceType.UDF)) { List udfFuncs = udfFunctionMapper.listUdfByResourceFullName(allChildrenFullNameArray); @@ -1069,70 +960,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe } } - // delete data in database - if (resourcesNeedToDeleteSet.size() > 0) { - for (ResourcesTask resourcesTask : resourcesNeedToDeleteSet) { - int taskId = resourcesTask.getTaskId(); - if (processService.isTaskOnline(taskDefinitionMapper.selectById(taskId).getCode())) { - log.error("can't be deleted,because it is used of process definition that's online"); - log.error("resource task relation id:{} is used of task code {}", resourcesTask.getId(), - taskDefinitionMapper.selectById(taskId).getCode()); - putMsg(result, Status.RESOURCE_IS_USED); - return result; - } - } - - for (ResourcesTask existResource : resourcesNeedToDeleteSet) { - int taskId = existResource.getTaskId(); - long taskCode = taskDefinitionMapper.selectById(taskId).getCode(); - - // use taskCode to get processDefinitionCode, then get a list of processDefinitionLog. - List processTaskRelation = processTaskRelationMapper.selectByMap( - Collections.singletonMap("post_task_code", taskCode)); - if (processTaskRelation.size() > 0) { - long processDefinitionCode = processTaskRelation.get(0).getProcessDefinitionCode(); - int processDefinitionVersion = processTaskRelation.get(0).getProcessDefinitionVersion(); - List taskRelationList = processTaskRelationMapper.queryByProcessCode( - processTaskRelation.get(0).getProjectCode(), - processDefinitionCode); - - List taskDefinitionLogList = new ArrayList<>(); - - if (taskRelationList.size() > 0) { - ProcessDefinitionLog processDefinition = - processDefinitionLogMapper.queryByDefinitionCodeAndVersion( - processDefinitionCode, processDefinitionVersion); - for (ProcessTaskRelation taskRelation : taskRelationList) { - long taskCodeInProcess = taskRelation.getPostTaskCode(); - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCodeInProcess); - if (taskCodeInProcess == taskCode) { - taskDefinition.setTaskParams(RemoveResourceFromResourceList(existResource.getFullName(), - taskDefinition.getTaskParams(), false)); - } - taskDefinitionLogList.add(taskDefinition); - } - - // update workflow & task definition associated to the resource - if (processDefinition != null) { - processDefinitionService.updateProcessDefinition(loginUser, - processDefinition.getProjectCode(), - processDefinition.getName(), - processDefinition.getCode(), - processDefinition.getDescription(), - processDefinition.getGlobalParams(), - processDefinition.getLocations(), - processDefinition.getTimeout(), - tenantCode, - JSONUtils.toJsonString(taskRelationList.toArray()), - JSONUtils.toJsonString(taskDefinitionLogList.toArray()), - "", - processDefinition.getExecutionType()); - } - } - } - } - } - // delete file on hdfs,S3 storageOperate.delete(fullName, allChildren, true); @@ -1169,50 +996,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return taskParameter; } - private String AddResourceToResourceList(String oldPrefix, String newPrefix, String resFullName, - String taskParameter, boolean isDir) { - Map taskParameters = JSONUtils.parseObject( - taskParameter, - new TypeReference>() { - }); - - if (taskParameters.containsKey("resourceList")) { - String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList")); - List resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class); - - // add updated resource to replace the original resource. - ResourceInfo newResource = new ResourceInfo(); - if (isDir) { - // we add spearator here because we dont want rare cases like - // oldFullName: .../folderToDelete and a resource path: .../folderToDeleteAnotherFolder - // Therefore, we make sure the oldFullName has a format of .../folderToDelete/ when - // modifying resourceFullNames in taskDefinition. - String oldFullNameWSeparator = oldPrefix + FOLDER_SEPARATOR; - String newFullNameWSpearator = newPrefix + FOLDER_SEPARATOR; - - newResource.setResourceName(resFullName.replace(oldFullNameWSeparator, newFullNameWSpearator)); - } else { - newResource.setResourceName(newPrefix); - } - resourceInfos.add(newResource); - - taskParameters.put("resourceList", resourceInfos); - - return JSONUtils.toJsonString(taskParameters); - } - return taskParameter; - } - - private String RemoveResourceFromIdsNew(int idToDelete, String idNews) { - - String[] resourceIds = idNews.split(","); - Set resourceIdSet = Arrays.stream(resourceIds) - .map(Integer::parseInt) - .filter(integerId -> !integerId.equals(idToDelete)) - .collect(Collectors.toSet()); - return Joiner.on(",").join(resourceIdSet); - } - /** * verify resource by name and type * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java deleted file mode 100644 index 799aa3e901..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.dolphinscheduler.dao.entity; - -import org.apache.dolphinscheduler.spi.enums.ResourceType; - -import lombok.Data; - -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; - -@Data -@TableName("t_ds_relation_resources_task") -public class ResourcesTask { - - @TableId(value = "id", type = IdType.AUTO) - private Integer id; - - private String fullName; - - private int taskId; - - private ResourceType type; - - public ResourcesTask(int id, String fullName, int taskId, ResourceType type) { - this.id = id; - this.fullName = fullName; - this.taskId = taskId; - this.type = type; - } - - public ResourcesTask(int taskId, String fullName, ResourceType type) { - this.taskId = taskId; - this.fullName = fullName; - this.type = type; - } -} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java deleted file mode 100644 index bdd1542c49..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.dolphinscheduler.dao.mapper; - -import org.apache.dolphinscheduler.dao.entity.ResourcesTask; - -import org.apache.ibatis.annotations.Param; - -import java.util.List; - -import com.baomidou.mybatisplus.core.mapper.BaseMapper; - -/** - * resource task relation mapper interface - */ -public interface ResourceTaskMapper extends BaseMapper { - - Integer existResourceByTaskIdNFullName(@Param("taskId") int task_id, @Param("fullName") String fullName); - - int deleteIds(@Param("resIds") Integer[] resIds); - - int updateResource(@Param("id") int id, @Param("fullName") String fullName); - - List selectBatchFullNames(@Param("fullNameArr") String[] fullNameArr); - - List selectSubfoldersFullNames(@Param("folderPath") String folderPath); -} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml deleted file mode 100644 index f5131405de..0000000000 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - - ${alias}.id, ${alias}.full_name, ${alias}.task_id, ${alias}.type - - - - - - - UPDATE t_ds_relation_resources_task SET full_name=#{fullName} WHERE id=#{id} - - - - delete from t_ds_relation_resources_task where id in - - #{i} - - - - - \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index b54dfd07d6..58844f59c5 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -798,24 +798,6 @@ CREATE TABLE t_ds_resources -- Records of t_ds_resources -- ---------------------------- --- ---------------------------- --- Table structure for t_ds_relation_resources_task --- ---------------------------- -DROP TABLE IF EXISTS t_ds_relation_resources_task CASCADE; -CREATE TABLE t_ds_relation_resources_task -( - id int(11) NOT NULL AUTO_INCREMENT, - task_id int(11) DEFAULT NULL, - full_name varchar(255) DEFAULT NULL, - type tinyint(4) DEFAULT NULL, - PRIMARY KEY (id), - UNIQUE KEY t_ds_relation_resources_task_un (task_id, full_name) -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; - --- ---------------------------- --- Records of t_ds_relation_resources_task --- ---------------------------- - -- ---------------------------- -- Table structure for t_ds_schedules -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 1955d52e64..e0bc9316c6 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -796,23 +796,6 @@ CREATE TABLE `t_ds_resources` ( -- Records of t_ds_resources -- ---------------------------- --- ---------------------------- --- Table structure for t_ds_relation_resources_task --- ---------------------------- -DROP TABLE IF EXISTS `t_ds_relation_resources_task`; -CREATE TABLE `t_ds_relation_resources_task` ( - `id` int NOT NULL AUTO_INCREMENT COMMENT 'key', - `task_id` int(11) DEFAULT NULL COMMENT 'task id', - `full_name` varchar(255) DEFAULT NULL, - `type` tinyint DEFAULT NULL COMMENT 'resource type,0:FILE,1:UDF', - PRIMARY KEY (`id`), - UNIQUE KEY `t_ds_relation_resources_task_un` (`task_id`, `full_name`) -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; - --- ---------------------------- --- Records of t_ds_relation_resources_task --- ---------------------------- - -- ---------------------------- -- Table structure for t_ds_schedules -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index f3e6e43e64..9c5f60a372 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -705,19 +705,6 @@ CREATE TABLE t_ds_resources ( CONSTRAINT t_ds_resources_un UNIQUE (full_name, type) ) ; --- --- Table structure for table t_ds_relation_resources_task --- -DROP TABLE IF EXISTS t_ds_relation_resources_task; -CREATE TABLE t_ds_relation_resources_task ( - id SERIAL NOT NULL, - task_id int DEFAULT NULL, - full_name varchar(255) DEFAULT NULL, - type int DEFAULT NULL, - PRIMARY KEY (id), - CONSTRAINT t_ds_relation_resources_task_un UNIQUE (task_id, full_name) -); - -- -- Table structure for table t_ds_schedules -- diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql index 5c91090c3d..44b545c2bd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql @@ -319,7 +319,6 @@ alter table t_ds_relation_project_user CONVERT TO CHARACTER SET utf8 COLLATE utf alter table t_ds_relation_resources_user CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; alter table t_ds_relation_udfs_user CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; alter table t_ds_resources CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; -alter table t_ds_relation_resources_task CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; alter table t_ds_schedules CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; alter table t_ds_session CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; alter table t_ds_task_instance CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 2863274708..b38bff49d4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -69,7 +69,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Resource; -import org.apache.dolphinscheduler.dao.entity.ResourcesTask; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; @@ -98,7 +97,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; -import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -148,7 +146,6 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -232,9 +229,6 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private ResourceMapper resourceMapper; - @Autowired - private ResourceTaskMapper resourceTaskMapper; - @Autowired private ResourceUserMapper resourceUserMapper; @@ -1414,13 +1408,9 @@ public class ProcessServiceImpl implements ProcessService { return new ResourceInfo(); } resourceInfo = new ResourceInfo(); - // get resource from database, only one resource should be returned - Integer resultList = resourceTaskMapper.existResourceByTaskIdNFullName(task_id, resourceFullName); - if (resultList != null) { - resourceInfo.setId(resultList); - resourceInfo.setRes(res.getRes()); - resourceInfo.setResourceName(resourceFullName); - } + resourceInfo.setId(-1); + resourceInfo.setRes(res.getRes()); + resourceInfo.setResourceName(resourceFullName); log.info("updated resource info {}", JSONUtils.toJsonString(resourceInfo)); } @@ -2061,42 +2051,9 @@ public class ProcessServiceImpl implements ProcessService { if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); - - for (TaskDefinitionLog newTaskDefinitionLog : newTaskDefinitionLogs) { - Set resourceFullNameSet = getResourceFullNames(newTaskDefinitionLog); - for (String resourceFullName : resourceFullNameSet) { - List taskDefinitionList = taskDefinitionMapper.selectByMap( - Collections.singletonMap("code", newTaskDefinitionLog.getCode())); - if (taskDefinitionList.size() > 0) { - createRelationTaskResourcesIfNotExist( - taskDefinitionList.get(0).getId(), resourceFullName); - } - } - } - } if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) { - Set resourceFullNameSet = getResourceFullNames(taskDefinitionLog); - - // remove resources that user deselected. - for (ResourcesTask resourcesTask : resourceTaskMapper.selectByMap( - Collections.singletonMap("task_id", - taskDefinitionMapper.queryByCode(taskDefinitionLog.getCode()).getId()))) { - if (!resourceFullNameSet.contains(resourcesTask.getFullName())) { - resourceTaskMapper.deleteById(resourcesTask.getId()); - } - } - - for (String resourceFullName : resourceFullNameSet) { - List taskDefinitionList = taskDefinitionMapper.selectByMap( - Collections.singletonMap("code", taskDefinitionLog.getCode())); - if (taskDefinitionList.size() > 0) { - createRelationTaskResourcesIfNotExist( - taskDefinitionList.get(0).getId(), resourceFullName); - } - } - updateResult += taskDefinitionMapper.updateById(taskDefinitionLog); } } @@ -2717,35 +2674,4 @@ public class ProcessServiceImpl implements ProcessService { triggerRelationService.saveCommandTrigger(commandId, processInstanceId); } - private Set getResourceFullNames(TaskDefinition taskDefinition) { - Set resourceFullNames = null; - AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder() - .taskType(taskDefinition.getTaskType()).taskParams(taskDefinition.getTaskParams()).build()); - - if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - resourceFullNames = params.getResourceFilesList().stream() - .filter(t -> !StringUtils.isBlank(t.getResourceName())) - .map(ResourceInfo::getResourceName) - .collect(toSet()); - } - - if (CollectionUtils.isEmpty(resourceFullNames)) { - return new HashSet(); - } - - return resourceFullNames; - } - - private Integer createRelationTaskResourcesIfNotExist(int taskId, String resourceFullName) { - - Integer resourceId = resourceTaskMapper.existResourceByTaskIdNFullName(taskId, resourceFullName); - if (null == resourceId) { - // create the relation if not exist - ResourcesTask resourcesTask = new ResourcesTask(taskId, resourceFullName, ResourceType.FILE); - resourceTaskMapper.insert(resourcesTask); - return resourcesTask.getId(); - } - - return resourceId; - } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 65b67c09fe..fdaf75ab90 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -59,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; -import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; @@ -154,8 +153,6 @@ public class ProcessServiceTest { @Mock private ResourceMapper resourceMapper; @Mock - private ResourceTaskMapper resourceTaskMapper; - @Mock private TaskGroupMapper taskGroupMapper; @Mock private DataSourceMapper dataSourceMapper; @@ -720,11 +717,10 @@ public class ProcessServiceTest { resourceInfoNormal.setId(1); resourceInfoNormal.setRes("test.txt"); resourceInfoNormal.setResourceName("/test.txt"); - Mockito.when(resourceTaskMapper.existResourceByTaskIdNFullName(0, "/test.txt")).thenReturn(1); ResourceInfo updatedResourceInfo3 = processService.updateResourceInfo(0, resourceInfoNormal); - Assertions.assertEquals(1, updatedResourceInfo3.getId().intValue()); + Assertions.assertEquals(-1, updatedResourceInfo3.getId().intValue()); Assertions.assertEquals("test.txt", updatedResourceInfo3.getRes()); Assertions.assertEquals("/test.txt", updatedResourceInfo3.getResourceName());