From 2ab8c1ca7d820c8921fa062b0757c9b70b00048c Mon Sep 17 00:00:00 2001 From: xiangzihao <460888207@qq.com> Date: Fri, 4 Mar 2022 14:05:59 +0800 Subject: [PATCH] [Fix-8616][WorkFlowLineage] work flow lineage search result missing data (#8684) * fix bug_8616 * remove meaningless query column * fix code smell * fix remaining problems --- .../service/impl/TaskGroupServiceImpl.java | 1 - .../impl/WorkFlowLineageServiceImpl.java | 78 ++++++++++++------- .../dao/mapper/WorkFlowLineageMapper.java | 27 ++++++- .../dao/mapper/WorkFlowLineageMapper.xml | 56 +++++++++++++ 4 files changed, 130 insertions(+), 32 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 9177ae336e..8bd96b5dd6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -216,7 +216,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe pageInfo.setTotalList(list); result.put(Constants.DATA_LIST, pageInfo); - logger.info("select result:{}", taskGroupPaging); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index e3ee534c5e..eb3c5b08a5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.curator.shaded.com.google.common.collect.Sets; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -80,19 +79,18 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF } @Override - public Map queryWorkFlowLineageByCode(long projectCode, long workFlowCode) { + public Map queryWorkFlowLineageByCode(long projectCode, long sourceWorkFlowCode) { Map result = new HashMap<>(); Project project = projectMapper.queryByCode(projectCode); if (project == null) { putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); return result; } - Map workFlowLineagesMap = new HashMap<>(); + List workFlowLineages = new ArrayList<>(); Set workFlowRelations = new HashSet<>(); - Set sourceWorkFlowCodes = Sets.newHashSet(workFlowCode); - recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes); + recursiveWorkFlow(projectCode, sourceWorkFlowCode, workFlowLineages, workFlowRelations); Map workFlowLists = new HashMap<>(); - workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values()); + workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages); workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); result.put(Constants.DATA_LIST, workFlowLists); putMsg(result, Status.SUCCESS); @@ -100,31 +98,51 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF } private void recursiveWorkFlow(long projectCode, - Map workFlowLineagesMap, - Set workFlowRelations, - Set sourceWorkFlowCodes) { - for (Long workFlowCode : sourceWorkFlowCodes) { - WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode); - workFlowLineagesMap.put(workFlowCode, workFlowLineage); - List processLineages = workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode); - List taskDefinitionList = new ArrayList<>(); - for (ProcessLineage processLineage : processLineages) { - if (processLineage.getPreTaskCode() > 0) { - taskDefinitionList.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion())); - } - if (processLineage.getPostTaskCode() > 0) { - taskDefinitionList.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion())); + long sourceWorkFlowCode, + List workFlowLineages, + Set workFlowRelations) { + workFlowLineages.add(workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode,sourceWorkFlowCode)); + + List downStreamWorkFlowLineages = + workFlowLineageMapper.queryDownstreamLineageByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT"); + workFlowLineages.addAll(downStreamWorkFlowLineages); + downStreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations.add(new WorkFlowRelation(sourceWorkFlowCode, workFlowLineage.getWorkFlowCode()))); + + List upstreamWorkFlowLineages = new ArrayList<>(); + getUpstreamLineages(sourceWorkFlowCode, upstreamWorkFlowLineages); + workFlowLineages.addAll(upstreamWorkFlowLineages); + upstreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations.add(new WorkFlowRelation(workFlowLineage.getWorkFlowCode(), sourceWorkFlowCode))); + } + + private void getUpstreamLineages(long sourceWorkFlowCode, + List upstreamWorkFlowLineages) { + List workFlowDependentDefinitionList = + workFlowLineageMapper.queryUpstreamDependentParamsByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT"); + + List upstreamProcessDefinitionCodes = new ArrayList<>(); + + getProcessDefinitionCodeByDependentDefinitionList(workFlowDependentDefinitionList, + upstreamProcessDefinitionCodes); + + if (!upstreamProcessDefinitionCodes.isEmpty()) { + upstreamWorkFlowLineages.addAll( + workFlowLineageMapper.queryWorkFlowLineageByProcessDefinitionCodes(upstreamProcessDefinitionCodes)); + } + } + + /** + * get dependent process definition code by dependent process definition list + */ + private void getProcessDefinitionCodeByDependentDefinitionList(List dependentDefinitionList, + List processDefinitionCodes) { + for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) { + for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters().getDependTaskList()) { + for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) { + if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) { + processDefinitionCodes.add(dependentItem.getDefinitionCode()); + } } } - sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlowCode, taskDefinitionList); - if (sourceWorkFlowCodes.isEmpty()) { - workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode)); - return; - } else { - workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA)); - sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlowCode))); - recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes); - } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index 314f542b06..dfdcafa640 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -45,6 +45,14 @@ public interface WorkFlowLineageMapper { */ WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, @Param("workFlowCode") long workFlowCode); + /** + * queryWorkFlowLineageByProcessDefinitionCodes + * + * @param workFlowCodes workFlowCodes + * @return WorkFlowLineage + */ + List queryWorkFlowLineageByProcessDefinitionCodes(@Param("workFlowCodes") List workFlowCodes); + /** * queryWorkFlowLineageByCode * @@ -71,11 +79,28 @@ public interface WorkFlowLineageMapper { List queryProcessLineageByCode(@Param("projectCode") long projectCode, @Param("processDefinitionCode") long processDefinitionCode); - /** * query process definition by name * * @return dependent process definition */ List queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code); + + /** + * query downstream work flow lineage by process definition code + * + * @return dependent process definition + */ + List queryDownstreamLineageByProcessDefinitionCode(@Param("code") long code, + @Param("taskType") String taskType); + + + /** + * query upstream work flow dependent task params by process definition code + * + * @return task_params + */ + List queryUpstreamDependentParamsByProcessDefinitionCode(@Param("code") long code, + @Param("taskType") String taskType); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index eed6476525..6b5e487c0a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -42,6 +42,25 @@ where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode} + + + + + +