Browse Source

[Feature][jsonsplit]modify listResources mothod and remove getResourceIds mothod (#4928)

* Modify Project and ProjectUser Mapper

* Modify Project and ProjectUser Mapper

* project_code is bigint(20)

* modify ERROR name

* modify saveProcessDefine, remove the duplicate code with createTaskAndRelation

* modify import/export processdefinition, add genProcessData

* fix ut and bug

* code style

* repalce project_id with code

* conflicts solve

* conflicts solve

* conflicts solve

* bugfix

* modify listResources mothod and remove getResourceIds mothod

* 1

* conflicts solve

* modify listResources mothod and remove getResourceIds mothod

* modify listResources mothod and remove getResourceIds mothod
pull/3/MERGE
Simon 3 years ago committed by GitHub
parent
commit
0c2b065db0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  4. 59
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/ResourceProcessDefinitionUtils.java
  5. 26
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
  6. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/ResourceProcessDefinitionUtilsTest.java

54
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -43,8 +43,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -53,7 +51,6 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils; import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@ -80,7 +77,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -218,45 +214,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
/**
* get resource ids
*
* @param processData process data
* @return resource ids
*/
private String getResourceIds(ProcessData processData) {
List<TaskNode> tasks = processData.getTasks();
Set<Integer> resourceIds = new HashSet<>();
StringBuilder sb = new StringBuilder();
if (CollectionUtils.isEmpty(tasks)) {
return sb.toString();
}
for (TaskNode taskNode : tasks) {
String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter);
if (params == null) {
continue;
}
if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
Set<Integer> tempSet = params.getResourceFilesList().
stream()
.filter(t -> t.getId() != 0)
.map(ResourceInfo::getId)
.collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
}
for (int i : resourceIds) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(i);
}
return sb.toString();
}
/** /**
* query process definition list * query process definition list
* *
@ -278,7 +235,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getCode()); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getCode());
resourceList.stream().forEach(processDefinition -> { resourceList.forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition); ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
}); });
@ -317,7 +274,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
page, searchVal, userId, project.getCode(), isAdmin(loginUser)); page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords(); List<ProcessDefinition> records = processDefinitionIPage.getRecords();
records.stream().forEach(processDefinition -> { records.forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition); ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
}); });
@ -1264,7 +1221,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Project project = projectMapper.selectById(projectId); Project project = projectMapper.selectById(projectId);
List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getCode()); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getCode());
resourceList.stream().forEach(processDefinition -> { resourceList.forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition); ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
}); });
@ -1461,9 +1418,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} else { } else {
ProcessData processData = processService.genProcessData(processDefinition); ProcessData processData = processService.genProcessData(processDefinition);
List<TaskNode> taskNodeList = processData.getTasks(); List<TaskNode> taskNodeList = processData.getTasks();
taskNodeList.stream().forEach(taskNode -> { taskNodeList.forEach(taskNode -> taskNode.setCode(0L));
taskNode.setCode(0L);
});
processData.setTasks(taskNodeList); processData.setTasks(taskNodeList);
String processDefinitionJson = JSONUtils.toJsonString(processData); String processDefinitionJson = JSONUtils.toJsonString(processData);
return createProcessDefinition( return createProcessDefinition(
@ -1757,4 +1712,3 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -695,7 +695,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// get all resource id of process definitions those is released // get all resource id of process definitions those is released
List<Map<String, Object>> list = processDefinitionMapper.listResources(); List<Map<String, Object>> list = processDefinitionMapper.listResources();
Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list); Map<Integer, Set<Long>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
Set<Integer> resourceIdSet = resourceProcessMap.keySet(); Set<Integer> resourceIdSet = resourceProcessMap.keySet();
// get all children of the resource // get all children of the resource
List<Integer> allChildren = listAllChildren(resource,true); List<Integer> allChildren = listAllChildren(resource,true);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -597,7 +597,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// get all resource id of process definitions those is released // get all resource id of process definitions those is released
List<Map<String, Object>> list = processDefinitionMapper.listResourcesByUser(userId); List<Map<String, Object>> list = processDefinitionMapper.listResourcesByUser(userId);
Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list); Map<Integer, Set<Long>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
Set<Integer> resourceIdSet = resourceProcessMap.keySet(); Set<Integer> resourceIdSet = resourceProcessMap.keySet();
resourceIdSet.retainAll(oldAuthorizedResIds); resourceIdSet.retainAll(oldAuthorizedResIds);

59
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/ResourceProcessDefinitionUtils.java

@ -18,7 +18,12 @@ package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.*; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -26,35 +31,37 @@ import java.util.stream.Collectors;
*/ */
public class ResourceProcessDefinitionUtils { public class ResourceProcessDefinitionUtils {
/** /**
* get resource process map key is resource id,value is the set of process definition * get resource process map key is resource id,value is the set of process definition code
* @param list the map key is process definition id and value is resource_ids *
* @return resource process definition map * @param resourceList the map key is process definition code and value is resource_ids
* @return resource process definition map (resourceId -> processDefinitionCodes)
*/ */
public static Map<Integer, Set<Integer>> getResourceProcessDefinitionMap(List<Map<String, Object>> list) { public static Map<Integer, Set<Long>> getResourceProcessDefinitionMap(List<Map<String, Object>> resourceList) {
Map<Integer, String> map = new HashMap<>();
Map<Integer, Set<Integer>> result = new HashMap<>(); // resourceId -> processDefinitionCodes
if (CollectionUtils.isNotEmpty(list)) { Map<Integer, Set<Long>> resourceResult = new HashMap<>();
for (Map<String, Object> tempMap : list) {
map.put((Integer) tempMap.get("id"), (String)tempMap.get("resource_ids"));
}
}
for (Map.Entry<Integer, String> entry : map.entrySet()) { if (CollectionUtils.isNotEmpty(resourceList)) {
Integer mapKey = entry.getKey(); for (Map<String, Object> resourceMap : resourceList) {
String[] arr = entry.getValue().split(","); Long code = (Long) resourceMap.get("code");
Set<Integer> mapValues = Arrays.stream(arr).map(Integer::parseInt).collect(Collectors.toSet()); String[] resourceIds = ((String) resourceMap.get("resource_ids"))
for (Integer value : mapValues) { .split(",");
if (result.containsKey(value)) {
Set<Integer> set = result.get(value); Set<Integer> resourceIdSet = Arrays.stream(resourceIds).map(Integer::parseInt).collect(Collectors.toSet());
set.add(mapKey); for (Integer resourceId : resourceIdSet) {
result.put(value, set); Set<Long> codeSet;
} else { if (resourceResult.containsKey(resourceId)) {
Set<Integer> set = new HashSet<>(); codeSet = resourceResult.get(resourceId);
set.add(mapKey); } else {
result.put(value, set); codeSet = new HashSet<>();
}
codeSet.add(code);
resourceResult.put(resourceId, codeSet);
} }
} }
} }
return result;
return resourceResult;
} }
} }

26
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -123,6 +123,7 @@
</if> </if>
group by td.user_id,tu.user_name group by td.user_id,tu.user_name
</select> </select>
<select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"> <select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT SELECT
pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description, pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description,
@ -137,17 +138,28 @@
AND pd.id = #{processDefineId} AND pd.id = #{processDefineId}
</select> </select>
<select id="listResources" resultType="java.util.HashMap"> <select id="listResources" resultType="java.util.HashMap">
SELECT id,resource_ids SELECT distinct pd.code,td.resource_ids
FROM t_ds_process_definition FROM t_ds_process_task_relation ptr
WHERE release_state = 1 and resource_ids is not null and resource_ids != '' join t_ds_process_definition pd
on ptr.process_definition_code=pd.code and ptr.process_definition_version = pd.version
and ptr.project_code=pd.project_code and pd.release_state = 1
join t_ds_task_definition td
on (ptr.pre_task_code=td.code and ptr.pre_task_version=td.version)
or (ptr.pre_task_code=td.code and ptr.pre_task_version=td.version)
WHERE td.resource_ids is not null and td.resource_ids != ''
</select> </select>
<select id="listResourcesByUser" resultType="java.util.HashMap"> <select id="listResourcesByUser" resultType="java.util.HashMap">
SELECT id,resource_ids SELECT distinct pd.code,td.resource_ids
FROM t_ds_process_definition FROM t_ds_process_task_relation ptr
WHERE user_id = #{userId} and release_state = 1 and resource_ids is not null and resource_ids != '' join t_ds_process_definition pd
on ptr.process_definition_code=pd.code and ptr.process_definition_version = pd.version
and ptr.project_code=pd.project_code and pd.release_state = 1
join t_ds_task_definition td
on (ptr.pre_task_code=td.code and ptr.pre_task_version=td.version)
or (ptr.pre_task_code=td.code and ptr.pre_task_version=td.version)
WHERE td.resource_ids is not null and td.resource_ids != '' and td.user_id = #{userId}
</select> </select>
<select id="listProjectIds" resultType="java.lang.Integer"> <select id="listProjectIds" resultType="java.lang.Integer">

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/ResourceProcessDefinitionUtilsTest.java

@ -32,7 +32,7 @@ public class ResourceProcessDefinitionUtilsTest {
public void getResourceProcessDefinitionMapTest(){ public void getResourceProcessDefinitionMapTest(){
List<Map<String,Object>> mapList = new ArrayList<>(); List<Map<String,Object>> mapList = new ArrayList<>();
Map<String,Object> map = new HashMap(); Map<String,Object> map = new HashMap();
map.put("id",1); map.put("code",1L);
map.put("resource_ids","1,2,3"); map.put("resource_ids","1,2,3");
mapList.add(map); mapList.add(map);
Assert.assertNotNull(ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(mapList)); Assert.assertNotNull(ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(mapList));

Loading…
Cancel
Save