Browse Source

[Improvement-13651] Remove useless resources_task (#13721)

3.2.0-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
8895921d87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 217
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 56
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java
  3. 45
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java
  4. 58
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml
  5. 18
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  6. 17
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  7. 13
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  8. 1
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
  9. 80
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  10. 6
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

217
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor
import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegexUtils;
@ -44,27 +43,20 @@ import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesTask;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections4.CollectionUtils;
@ -110,21 +102,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
@Autowired
private ResourceMapper resourcesMapper;
@Autowired
private ResourceTaskMapper resourceTaskMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private UdfFuncMapper udfFunctionMapper;
@Autowired
private ProcessService processService;
@Autowired
private ProcessDefinitionService processDefinitionService;
@Autowired
private TenantMapper tenantMapper;
@ -475,7 +455,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
Date now = new Date();
long originFileSize = resource.getSize();
resource.setAlias(name);
resource.setFileName(name);
@ -491,87 +470,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
List<ResourcesTask> existResourcesList;
if (resource.isDirectory()) {
existResourcesList = resourceTaskMapper.selectSubfoldersFullNames(originFullName + FOLDER_SEPARATOR);
} else {
existResourcesList = resourceTaskMapper.selectByMap(
Collections.singletonMap("full_name", originFullName));
}
if (existResourcesList.size() > 0 && !fullName.equals(originFullName)) {
// check if any related task is online. If it is, it can not be updated.
for (ResourcesTask existResource : existResourcesList) {
int taskId = existResource.getTaskId();
if (processService.isTaskOnline(taskDefinitionMapper.selectById(taskId).getCode())) {
log.error("can't be updated,because it is used of process definition that's online");
log.error("resource task relation id:{} is used of task code {}", existResource.getId(),
taskDefinitionMapper.selectById(taskId).getCode());
putMsg(result, Status.RESOURCE_IS_USED);
return result;
}
}
for (ResourcesTask existResource : existResourcesList) {
int taskId = existResource.getTaskId();
long taskCode = taskDefinitionMapper.selectById(taskId).getCode();
List<ProcessTaskRelation> processTaskRelation = processTaskRelationMapper.selectByMap(
Collections.singletonMap("post_task_code", taskCode));
if (processTaskRelation.size() > 0) {
long processDefinitionCode = processTaskRelation.get(0).getProcessDefinitionCode();
int processDefinitionVersion = processTaskRelation.get(0).getProcessDefinitionVersion();
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryByProcessCode(
processTaskRelation.get(0).getProjectCode(),
processDefinitionCode);
List<TaskDefinition> taskDefinitionLogList = new ArrayList<>();
if (taskRelationList.size() > 0) {
ProcessDefinitionLog processDefinition =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processDefinitionCode, processDefinitionVersion);
for (ProcessTaskRelation taskRelation : taskRelationList) {
long taskCodeInProcess = taskRelation.getPostTaskCode();
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCodeInProcess);
if (taskCodeInProcess == taskCode) {
// originFullName is a prefix if isDirectory is true
taskDefinition.setTaskParams(RemoveResourceFromResourceList(originFullName,
taskDefinition.getTaskParams(),
resource.isDirectory()));
// if isDirectory is true, fullName is the new prefix. we replace old prefix
// of resource fullname with the new prefix.
// if isDirectory is false, fullName is the new path.
taskDefinition.setTaskParams(AddResourceToResourceList(originFullName,
fullName,
existResource.getFullName(),
taskDefinition.getTaskParams(),
resource.isDirectory()));
}
taskDefinitionLogList.add(taskDefinition);
}
// update workflow & task definition associated to the resource
if (processDefinition != null) {
processDefinitionService.updateProcessDefinition(loginUser,
processDefinition.getProjectCode(),
processDefinition.getName(),
processDefinition.getCode(),
processDefinition.getDescription(),
processDefinition.getGlobalParams(),
processDefinition.getLocations(),
processDefinition.getTimeout(),
tenantCode,
JSONUtils.toJsonString(taskRelationList.toArray()),
JSONUtils.toJsonString(taskDefinitionLogList.toArray()),
"",
processDefinition.getExecutionType());
}
}
}
}
}
if (file != null) {
// fail upload
if (!upload(loginUser, fullName, file, type)) {
@ -1049,15 +947,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
resTenantCode, resource.getType()).stream().map(storageEntity -> storageEntity.getFullName())
.collect(Collectors.toList());
Set<ResourcesTask> resourcesNeedToDeleteSet = new HashSet<>();
String[] allChildrenFullNameArray = allChildren.stream().toArray(String[]::new);
// check before using allChildrenFullNameArray to query full names.
if (allChildrenFullNameArray.length != 0) {
resourcesNeedToDeleteSet.addAll(
resourceTaskMapper.selectBatchFullNames(allChildrenFullNameArray));
}
// if resource type is UDF,need check whether it is bound by UDF function
if (resource.getType() == (ResourceType.UDF)) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceFullName(allChildrenFullNameArray);
@ -1069,70 +960,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
}
// delete data in database
if (resourcesNeedToDeleteSet.size() > 0) {
for (ResourcesTask resourcesTask : resourcesNeedToDeleteSet) {
int taskId = resourcesTask.getTaskId();
if (processService.isTaskOnline(taskDefinitionMapper.selectById(taskId).getCode())) {
log.error("can't be deleted,because it is used of process definition that's online");
log.error("resource task relation id:{} is used of task code {}", resourcesTask.getId(),
taskDefinitionMapper.selectById(taskId).getCode());
putMsg(result, Status.RESOURCE_IS_USED);
return result;
}
}
for (ResourcesTask existResource : resourcesNeedToDeleteSet) {
int taskId = existResource.getTaskId();
long taskCode = taskDefinitionMapper.selectById(taskId).getCode();
// use taskCode to get processDefinitionCode, then get a list of processDefinitionLog.
List<ProcessTaskRelation> processTaskRelation = processTaskRelationMapper.selectByMap(
Collections.singletonMap("post_task_code", taskCode));
if (processTaskRelation.size() > 0) {
long processDefinitionCode = processTaskRelation.get(0).getProcessDefinitionCode();
int processDefinitionVersion = processTaskRelation.get(0).getProcessDefinitionVersion();
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryByProcessCode(
processTaskRelation.get(0).getProjectCode(),
processDefinitionCode);
List<TaskDefinition> taskDefinitionLogList = new ArrayList<>();
if (taskRelationList.size() > 0) {
ProcessDefinitionLog processDefinition =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processDefinitionCode, processDefinitionVersion);
for (ProcessTaskRelation taskRelation : taskRelationList) {
long taskCodeInProcess = taskRelation.getPostTaskCode();
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCodeInProcess);
if (taskCodeInProcess == taskCode) {
taskDefinition.setTaskParams(RemoveResourceFromResourceList(existResource.getFullName(),
taskDefinition.getTaskParams(), false));
}
taskDefinitionLogList.add(taskDefinition);
}
// update workflow & task definition associated to the resource
if (processDefinition != null) {
processDefinitionService.updateProcessDefinition(loginUser,
processDefinition.getProjectCode(),
processDefinition.getName(),
processDefinition.getCode(),
processDefinition.getDescription(),
processDefinition.getGlobalParams(),
processDefinition.getLocations(),
processDefinition.getTimeout(),
tenantCode,
JSONUtils.toJsonString(taskRelationList.toArray()),
JSONUtils.toJsonString(taskDefinitionLogList.toArray()),
"",
processDefinition.getExecutionType());
}
}
}
}
}
// delete file on hdfs,S3
storageOperate.delete(fullName, allChildren, true);
@ -1169,50 +996,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return taskParameter;
}
private String AddResourceToResourceList(String oldPrefix, String newPrefix, String resFullName,
String taskParameter, boolean isDir) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskParameter,
new TypeReference<Map<String, Object>>() {
});
if (taskParameters.containsKey("resourceList")) {
String resourceListStr = JSONUtils.toJsonString(taskParameters.get("resourceList"));
List<ResourceInfo> resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class);
// add updated resource to replace the original resource.
ResourceInfo newResource = new ResourceInfo();
if (isDir) {
// we add spearator here because we dont want rare cases like
// oldFullName: .../folderToDelete and a resource path: .../folderToDeleteAnotherFolder
// Therefore, we make sure the oldFullName has a format of .../folderToDelete/ when
// modifying resourceFullNames in taskDefinition.
String oldFullNameWSeparator = oldPrefix + FOLDER_SEPARATOR;
String newFullNameWSpearator = newPrefix + FOLDER_SEPARATOR;
newResource.setResourceName(resFullName.replace(oldFullNameWSeparator, newFullNameWSpearator));
} else {
newResource.setResourceName(newPrefix);
}
resourceInfos.add(newResource);
taskParameters.put("resourceList", resourceInfos);
return JSONUtils.toJsonString(taskParameters);
}
return taskParameter;
}
private String RemoveResourceFromIdsNew(int idToDelete, String idNews) {
String[] resourceIds = idNews.split(",");
Set<Integer> resourceIdSet = Arrays.stream(resourceIds)
.map(Integer::parseInt)
.filter(integerId -> !integerId.equals(idToDelete))
.collect(Collectors.toSet());
return Joiner.on(",").join(resourceIdSet);
}
/**
* verify resource by name and type
*

56
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ResourcesTask.java

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import lombok.Data;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@TableName("t_ds_relation_resources_task")
public class ResourcesTask {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private String fullName;
private int taskId;
private ResourceType type;
public ResourcesTask(int id, String fullName, int taskId, ResourceType type) {
this.id = id;
this.fullName = fullName;
this.taskId = taskId;
this.type = type;
}
public ResourcesTask(int taskId, String fullName, ResourceType type) {
this.taskId = taskId;
this.fullName = fullName;
this.type = type;
}
}

45
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.java

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ResourcesTask;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* resource task relation mapper interface
*/
public interface ResourceTaskMapper extends BaseMapper<ResourcesTask> {
Integer existResourceByTaskIdNFullName(@Param("taskId") int task_id, @Param("fullName") String fullName);
int deleteIds(@Param("resIds") Integer[] resIds);
int updateResource(@Param("id") int id, @Param("fullName") String fullName);
List<ResourcesTask> selectBatchFullNames(@Param("fullNameArr") String[] fullNameArr);
List<ResourcesTask> selectSubfoldersFullNames(@Param("folderPath") String folderPath);
}

58
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceTaskMapper.xml

@ -1,58 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper">
<sql id="baseSqlV2">
${alias}.id, ${alias}.full_name, ${alias}.task_id, ${alias}.type
</sql>
<select id="existResourceByTaskIdNFullName" resultType="java.lang.Integer">
select
id
from t_ds_relation_resources_task
where full_name = #{fullName} and task_id = #{taskId}
</select>
<select id="selectBatchFullNames" resultType="org.apache.dolphinscheduler.dao.entity.ResourcesTask">
select
id, full_name, task_id, type
from t_ds_relation_resources_task
where full_name in
<foreach collection="fullNameArr" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
<update id="updateResource" >
UPDATE t_ds_relation_resources_task SET full_name=#{fullName} WHERE id=#{id}
</update>
<delete id="deleteIds" parameterType="java.lang.Integer">
delete from t_ds_relation_resources_task where id in
<foreach collection="resIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
<select id="selectSubfoldersFullNames" resultType="org.apache.dolphinscheduler.dao.entity.ResourcesTask">
select
id, full_name, task_id, type
from t_ds_relation_resources_task
where full_name like concat(#{folderPath}, '%')
</select>
</mapper>

18
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -798,24 +798,6 @@ CREATE TABLE t_ds_resources
-- Records of t_ds_resources
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_relation_resources_task
-- ----------------------------
DROP TABLE IF EXISTS t_ds_relation_resources_task CASCADE;
CREATE TABLE t_ds_relation_resources_task
(
id int(11) NOT NULL AUTO_INCREMENT,
task_id int(11) DEFAULT NULL,
full_name varchar(255) DEFAULT NULL,
type tinyint(4) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY t_ds_relation_resources_task_un (task_id, full_name)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_ds_relation_resources_task
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_schedules
-- ----------------------------

17
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -796,23 +796,6 @@ CREATE TABLE `t_ds_resources` (
-- Records of t_ds_resources
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_relation_resources_task
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_relation_resources_task`;
CREATE TABLE `t_ds_relation_resources_task` (
`id` int NOT NULL AUTO_INCREMENT COMMENT 'key',
`task_id` int(11) DEFAULT NULL COMMENT 'task id',
`full_name` varchar(255) DEFAULT NULL,
`type` tinyint DEFAULT NULL COMMENT 'resource type,0:FILE,1:UDF',
PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_relation_resources_task_un` (`task_id`, `full_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
-- ----------------------------
-- Records of t_ds_relation_resources_task
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_schedules
-- ----------------------------

13
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -705,19 +705,6 @@ CREATE TABLE t_ds_resources (
CONSTRAINT t_ds_resources_un UNIQUE (full_name, type)
) ;
--
-- Table structure for table t_ds_relation_resources_task
--
DROP TABLE IF EXISTS t_ds_relation_resources_task;
CREATE TABLE t_ds_relation_resources_task (
id SERIAL NOT NULL,
task_id int DEFAULT NULL,
full_name varchar(255) DEFAULT NULL,
type int DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT t_ds_relation_resources_task_un UNIQUE (task_id, full_name)
);
--
-- Table structure for table t_ds_schedules
--

1
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql

@ -319,7 +319,6 @@ alter table t_ds_relation_project_user CONVERT TO CHARACTER SET utf8 COLLATE utf
alter table t_ds_relation_resources_user CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_relation_udfs_user CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_resources CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_relation_resources_task CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_schedules CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_session CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;
alter table t_ds_task_instance CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin;

80
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -69,7 +69,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesTask;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@ -98,7 +97,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -148,7 +146,6 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -232,9 +229,6 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ResourceMapper resourceMapper;
@Autowired
private ResourceTaskMapper resourceTaskMapper;
@Autowired
private ResourceUserMapper resourceUserMapper;
@ -1414,13 +1408,9 @@ public class ProcessServiceImpl implements ProcessService {
return new ResourceInfo();
}
resourceInfo = new ResourceInfo();
// get resource from database, only one resource should be returned
Integer resultList = resourceTaskMapper.existResourceByTaskIdNFullName(task_id, resourceFullName);
if (resultList != null) {
resourceInfo.setId(resultList);
resourceInfo.setRes(res.getRes());
resourceInfo.setResourceName(resourceFullName);
}
resourceInfo.setId(-1);
resourceInfo.setRes(res.getRes());
resourceInfo.setResourceName(resourceFullName);
log.info("updated resource info {}",
JSONUtils.toJsonString(resourceInfo));
}
@ -2061,42 +2051,9 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
for (TaskDefinitionLog newTaskDefinitionLog : newTaskDefinitionLogs) {
Set<String> resourceFullNameSet = getResourceFullNames(newTaskDefinitionLog);
for (String resourceFullName : resourceFullNameSet) {
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.selectByMap(
Collections.singletonMap("code", newTaskDefinitionLog.getCode()));
if (taskDefinitionList.size() > 0) {
createRelationTaskResourcesIfNotExist(
taskDefinitionList.get(0).getId(), resourceFullName);
}
}
}
}
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) {
Set<String> resourceFullNameSet = getResourceFullNames(taskDefinitionLog);
// remove resources that user deselected.
for (ResourcesTask resourcesTask : resourceTaskMapper.selectByMap(
Collections.singletonMap("task_id",
taskDefinitionMapper.queryByCode(taskDefinitionLog.getCode()).getId()))) {
if (!resourceFullNameSet.contains(resourcesTask.getFullName())) {
resourceTaskMapper.deleteById(resourcesTask.getId());
}
}
for (String resourceFullName : resourceFullNameSet) {
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.selectByMap(
Collections.singletonMap("code", taskDefinitionLog.getCode()));
if (taskDefinitionList.size() > 0) {
createRelationTaskResourcesIfNotExist(
taskDefinitionList.get(0).getId(), resourceFullName);
}
}
updateResult += taskDefinitionMapper.updateById(taskDefinitionLog);
}
}
@ -2717,35 +2674,4 @@ public class ProcessServiceImpl implements ProcessService {
triggerRelationService.saveCommandTrigger(commandId, processInstanceId);
}
private Set<String> getResourceFullNames(TaskDefinition taskDefinition) {
Set<String> resourceFullNames = null;
AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType()).taskParams(taskDefinition.getTaskParams()).build());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceFullNames = params.getResourceFilesList().stream()
.filter(t -> !StringUtils.isBlank(t.getResourceName()))
.map(ResourceInfo::getResourceName)
.collect(toSet());
}
if (CollectionUtils.isEmpty(resourceFullNames)) {
return new HashSet<String>();
}
return resourceFullNames;
}
private Integer createRelationTaskResourcesIfNotExist(int taskId, String resourceFullName) {
Integer resourceId = resourceTaskMapper.existResourceByTaskIdNFullName(taskId, resourceFullName);
if (null == resourceId) {
// create the relation if not exist
ResourcesTask resourcesTask = new ResourcesTask(taskId, resourceFullName, ResourceType.FILE);
resourceTaskMapper.insert(resourcesTask);
return resourcesTask.getId();
}
return resourceId;
}
}

6
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -59,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceTaskMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@ -154,8 +153,6 @@ public class ProcessServiceTest {
@Mock
private ResourceMapper resourceMapper;
@Mock
private ResourceTaskMapper resourceTaskMapper;
@Mock
private TaskGroupMapper taskGroupMapper;
@Mock
private DataSourceMapper dataSourceMapper;
@ -720,11 +717,10 @@ public class ProcessServiceTest {
resourceInfoNormal.setId(1);
resourceInfoNormal.setRes("test.txt");
resourceInfoNormal.setResourceName("/test.txt");
Mockito.when(resourceTaskMapper.existResourceByTaskIdNFullName(0, "/test.txt")).thenReturn(1);
ResourceInfo updatedResourceInfo3 = processService.updateResourceInfo(0, resourceInfoNormal);
Assertions.assertEquals(1, updatedResourceInfo3.getId().intValue());
Assertions.assertEquals(-1, updatedResourceInfo3.getId().intValue());
Assertions.assertEquals("test.txt", updatedResourceInfo3.getRes());
Assertions.assertEquals("/test.txt", updatedResourceInfo3.getResourceName());

Loading…
Cancel
Save