From 4893bef5a79fc022b74f8c273bd8079517726f35 Mon Sep 17 00:00:00 2001 From: longtb <67264931+longtb976@users.noreply.github.com> Date: Tue, 23 Aug 2022 14:27:26 +0800 Subject: [PATCH] [Improvement][TaskInstance] reduce database queries (#11522) * [Improvement][TaskInstance] reduce database queries * Update dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java Co-authored-by: caishunfeng * [Improvement][TaskInstance] queryByInstanceIdsAndCodes -> queryByProcessInstanceIdsAndTaskCodes Co-authored-by: zhangshunmin Co-authored-by: caishunfeng --- .../impl/ProcessDefinitionServiceImpl.java | 34 +++++++++----- .../impl/ProcessInstanceServiceImpl.java | 46 ++++++++++++------- .../dao/mapper/TaskInstanceMapper.java | 3 ++ .../dao/mapper/TaskInstanceMapper.xml | 18 ++++++++ .../dao/mapper/TaskInstanceMapperTest.java | 21 +++++++++ 5 files changed, 94 insertions(+), 28 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 1c7b2aba27..9cc12a8f8f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -114,7 +114,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -722,9 +721,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List processTaskRelationLogList = processTaskRelationLogMapper .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); if (taskRelationList.size() == processTaskRelationLogList.size()) { - Set taskRelationSet = taskRelationList.stream().collect(Collectors.toSet()); - Set processTaskRelationLogSet = - processTaskRelationLogList.stream().collect(Collectors.toSet()); + Set taskRelationSet = new HashSet<>(taskRelationList); + Set processTaskRelationLogSet = new HashSet<>(processTaskRelationLogList); if (taskRelationSet.size() == processTaskRelationLogSet.size()) { taskRelationSet.removeAll(processTaskRelationLogSet); if (!taskRelationSet.isEmpty()) { @@ -1047,7 +1045,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Override @Transactional public Map importProcessDefinition(User loginUser, long projectCode, MultipartFile file) { - Map result = new HashMap<>(); + Map result; String dagDataScheduleJson = FileUtils.file2String(file); List dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class); Project project = projectMapper.queryByCode(projectCode); @@ -1658,7 +1656,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro */ @Override public Map viewTree(User loginUser, long projectCode, long code, Integer limit) { - Map result = new HashMap<>(); + Map result; Project project = projectMapper.queryByCode(projectCode); // check user access for project result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW); @@ -1716,9 +1714,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro while (!ServerLifeCycleManager.isStopped()) { Set postNodeList; - Iterator>> iter = runningNodeMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry> en = iter.next(); + Set>> entries = runningNodeMap.entrySet(); + List processInstanceIds = processInstanceList.stream() + .limit(limit).map(ProcessInstance::getId).collect(Collectors.toList()); + List nodeCodes = entries.stream().map(e -> Long.parseLong(e.getKey())).collect(Collectors.toList()); + List taskInstances; + if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) { + taskInstances = Collections.emptyList(); + } else { + taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds, nodeCodes); + } + for (Map.Entry> en : entries) { String nodeCode = en.getKey(); parentTreeViewDtoList = en.getValue(); @@ -1730,8 +1736,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // set treeViewDto instances for (int i = limit - 1; i >= 0; i--) { ProcessInstance processInstance = processInstanceList.get(i); - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), - Long.parseLong(nodeCode)); + TaskInstance taskInstance = null; + for (TaskInstance instance : taskInstances) { + if (instance.getTaskCode() == Long.parseLong(nodeCode) + && instance.getProcessInstanceId() == processInstance.getId()) { + taskInstance = instance; + break; + } + } if (taskInstance == null) { treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null")); } else { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 833a734377..67ff0edb9b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -780,24 +780,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ganttDto.setTaskNames(nodeList); List taskList = new ArrayList<>(); - for (String node : nodeList) { - TaskInstance taskInstance = - taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node)); - if (taskInstance == null) { - continue; + if (!nodeList.isEmpty()) { + List taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList()); + List taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes( + Collections.singletonList(processInstanceId), taskCodes + ); + for (String node : nodeList) { + TaskInstance taskInstance = null; + for (TaskInstance instance : taskInstances) { + if (instance.getProcessInstanceId() == processInstanceId + && instance.getTaskCode() == Long.parseLong(node)) { + taskInstance = instance; + break; + } + } + if (taskInstance == null) { + continue; + } + Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); + Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); + Task task = new Task(); + task.setTaskName(taskInstance.getName()); + task.getStartDate().add(startTime.getTime()); + task.getEndDate().add(endTime.getTime()); + task.setIsoStart(startTime); + task.setIsoEnd(endTime); + task.setStatus(taskInstance.getState().toString()); + task.setExecutionDate(taskInstance.getStartTime()); + task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime())); + taskList.add(task); } - Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); - Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); - Task task = new Task(); - task.setTaskName(taskInstance.getName()); - task.getStartDate().add(startTime.getTime()); - task.getEndDate().add(endTime.getTime()); - task.setIsoStart(startTime); - task.setIsoEnd(endTime); - task.setStatus(taskInstance.getState().toString()); - task.setExecutionDate(taskInstance.getStartTime()); - task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime())); - taskList.add(task); } ganttDto.setTasks(taskList); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index ca68dc57e5..2ab78f5151 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -53,6 +53,9 @@ public interface TaskInstanceMapper extends BaseMapper { TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId, @Param("taskCode") Long taskCode); + List queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List processInstanceIds, + @Param("taskCodes") List taskCodes); + Integer countTask(@Param("projectCodes") Long[] projectCodes, @Param("taskIds") int[] taskIds); diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index e09b8aa887..0ba80db75f 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -134,6 +134,24 @@ and flag = 1 limit 1 +