diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index 35ed5fc617..ae6395557d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -20,9 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; @@ -45,6 +47,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF @Autowired private WorkFlowLineageMapper workFlowLineageMapper; + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + @Autowired private ProjectMapper projectMapper; @@ -58,56 +63,71 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF return result; } - private void getWorkFlowRelationRecursion(Set ids, List workFlowRelations, Set sourceIds) { - for (int id : ids) { - sourceIds.addAll(ids); - List workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id); - if (workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) { - Set idsTmp = new HashSet<>(); - for (WorkFlowRelation workFlowRelation : workFlowRelationsTmp) { - if (!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) { - idsTmp.add(workFlowRelation.getTargetWorkFlowId()); + private void getRelation(Map workFlowLineageMap, + Set workFlowRelations, + ProcessLineage processLineage) { + List relations = workFlowLineageMapper.queryCodeRelation( + processLineage.getPostTaskCode(), processLineage.getPostTaskVersion() + , processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + + for (ProcessLineage relation : relations) { + if (relation.getProcessDefinitionCode() != null) { + + relation.setPreTaskCode(processLineage.getPostTaskCode()); + relation.setPreTaskVersion(processLineage.getPostTaskVersion()); + + WorkFlowLineage pre = workFlowLineageMapper + .queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + // sourceWorkFlowId = "" + if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) { + workFlowLineageMap.put(pre.getWorkFlowId(), pre); + } + + WorkFlowLineage post = workFlowLineageMapper + .queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), relation.getProjectCode()); + + if (workFlowLineageMap.containsKey(post.getWorkFlowId())) { + WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); + String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); + if (sourceWorkFlowId.equals("")) { + workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); + } else { + workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + pre.getWorkFlowId()); } + + } else { + post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); + workFlowLineageMap.put(post.getWorkFlowId(), post); + } + + WorkFlowRelation workFlowRelation = new WorkFlowRelation(); + workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId()); + workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId()); + if (workFlowRelations.contains(workFlowRelation)) { + continue; } - workFlowRelations.addAll(workFlowRelationsTmp); - getWorkFlowRelationRecursion(idsTmp, workFlowRelations, sourceIds); + workFlowRelations.add(workFlowRelation); + getRelation(workFlowLineageMap, workFlowRelations, relation); } } + } @Override public Map queryWorkFlowLineageByIds(Set ids, int projectId) { Map result = new HashMap<>(); Project project = projectMapper.selectById(projectId); - List workFlowLineageList = workFlowLineageMapper.queryByIds(ids, project.getCode()); - Map workFlowLists = new HashMap<>(); - Set idsV = new HashSet<>(); - if (ids == null || ids.isEmpty()) { - for (WorkFlowLineage workFlowLineage : workFlowLineageList) { - idsV.add(workFlowLineage.getWorkFlowId()); - } - } else { - idsV = ids; - } - List workFlowRelations = new ArrayList<>(); - Set sourceIds = new HashSet<>(); - getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds); - - Set idSet = new HashSet<>(); - //If the incoming parameter is not empty, you need to add downstream workflow detail attributes - if (ids != null && !ids.isEmpty()) { - for (WorkFlowRelation workFlowRelation : workFlowRelations) { - idSet.add(workFlowRelation.getTargetWorkFlowId()); - } - for (int id : ids) { - idSet.remove(id); - } - if (!idSet.isEmpty()) { - workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, project.getCode())); - } + List processLineages = workFlowLineageMapper.queryRelationByIds(ids, project.getCode()); + + Map workFlowLineages = new HashMap<>(); + Set workFlowRelations = new HashSet<>(); + + for (ProcessLineage processLineage : processLineages) { + getRelation(workFlowLineages, workFlowRelations, processLineage); } - workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList); + Map workFlowLists = new HashMap<>(); + workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values()); workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); result.put(Constants.DATA_LIST, workFlowLists); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java index d9462630bf..db75a43a48 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java @@ -66,8 +66,6 @@ public class WorkFlowLineageServiceTest { ids.add(1); ids.add(2); - when(workFlowLineageMapper.queryByIds(ids, 1L)).thenReturn(getWorkFlowLineages()); - when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation()); Map result = workFlowLineageService.queryWorkFlowLineageByIds(ids,1); Map workFlowLists = (Map)result.get(Constants.DATA_LIST); List workFlowLineages = (List)workFlowLists.get("workFlowList"); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java new file mode 100644 index 0000000000..678db4289e --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +/** + * Process lineage + */ +public class ProcessLineage { + + /** + * project code + */ + private Long projectCode; + + /** + * post task code + */ + private Long postTaskCode; + + /** + * post task version + */ + private int postTaskVersion; + + /** + * pre task code + */ + private Long preTaskCode; + + /** + * pre task version + */ + private int preTaskVersion; + + /** + * process definition code + */ + private Long processDefinitionCode; + + /** + * process definition version + */ + private int processDefinitionVersion; + + public Long getProjectCode() { + return projectCode; + } + + public void setProjectCode(Long projectCode) { + this.projectCode = projectCode; + } + + public Long getProcessDefinitionCode() { + return processDefinitionCode; + } + + public void setProcessDefinitionCode(Long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; + } + + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + + public void setPostTaskCode(Long postTaskCode) { + this.postTaskCode = postTaskCode; + } + + public Long getPreTaskCode() { + return preTaskCode; + } + + public void setPreTaskCode(Long preTaskCode) { + this.preTaskCode = preTaskCode; + } + + public int getPreTaskVersion() { + return preTaskVersion; + } + + public void setPreTaskVersion(int preTaskVersion) { + this.preTaskVersion = preTaskVersion; + } + + public int getPostTaskVersion() { + return postTaskVersion; + } + + public void setPostTaskVersion(int postTaskVersion) { + this.postTaskVersion = postTaskVersion; + } + + public long getPostTaskCode() { + return postTaskCode; + } + + public void setPostTaskCode(long postTaskCode) { + this.postTaskCode = postTaskCode; + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index aad2b5699e..2afeb68339 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -17,21 +17,14 @@ package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.ConditionType; + +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.common.enums.ConditionType; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.JSONUtils; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** * process task relation diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java index c03c68eb0d..3a74d800d4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.dao.entity; +import java.util.Objects; + public class WorkFlowRelation { private int sourceWorkFlowId; private int targetWorkFlowId; @@ -35,4 +37,16 @@ public class WorkFlowRelation { public void setTargetWorkFlowId(int targetWorkFlowId) { this.targetWorkFlowId = targetWorkFlowId; } + + @Override + public boolean equals(Object obj) { + return obj instanceof WorkFlowRelation + && this.sourceWorkFlowId == ((WorkFlowRelation) obj).getSourceWorkFlowId() + && this.targetWorkFlowId == ((WorkFlowRelation) obj).getTargetWorkFlowId(); + } + + @Override + public int hashCode() { + return Objects.hash(sourceWorkFlowId, targetWorkFlowId); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index 14096548ce..14b0ee1481 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -16,9 +16,11 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; -import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; + import org.apache.ibatis.annotations.Param; + import java.util.List; import java.util.Set; @@ -26,24 +28,41 @@ public interface WorkFlowLineageMapper { /** * queryByName + * * @param searchVal searchVal * @param projectCode projectCode - * @return WorkFlowLineage list + * @return WorkFlowLineage list */ List queryByName(@Param("searchVal") String searchVal, @Param("projectCode") Long projectCode); /** - * queryByIds + * queryCodeRelation + * + * @param taskCode taskCode + * @param taskVersion taskVersion + * @param processDefinitionCode processDefinitionCode + * @return ProcessLineage + */ + List queryCodeRelation( + @Param("taskCode") Long taskCode, @Param("taskVersion") int taskVersion, + @Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode); + + /** + * queryRelationByIds + * * @param ids ids * @param projectCode projectCode - * @return WorkFlowLineage list + * @return ProcessLineage */ - List queryByIds(@Param("ids") Set ids, @Param("projectCode") Long projectCode); + List queryRelationByIds(@Param("ids") Set ids, @Param("projectCode") Long projectCode); /** - * query SourceTarget - * @param id id - * @return WorkFlowRelation list + * queryWorkFlowLineageByCode + * + * @param processDefinitionCode processDefinitioncode + * @param projectCode projectCode + * @return WorkFlowLineage */ - List querySourceTarget(@Param("id") int id); + WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index 897e1cafa7..a1dbeef6a9 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -18,86 +18,65 @@ + - + select ptr.project_code, + ptr.post_task_code, + ptr.post_task_version, + ptr.pre_task_code, + ptr.pre_task_version, + ptr.process_definition_code, + ptr.process_definition_version + from t_ds_process_definition pd + join t_ds_process_task_relation ptr on pd.code = ptr.process_definition_code and pd.version = + ptr.process_definition_version + where pd.project_code = #{projectCode} - and tepd.id in + and pd.id in #{i} - + - select id as target_work_flow_id,#{id} as source_work_flow_id - from t_ds_process_definition t - where json_extract(t.process_definition_json, '$**.dependItemList') is not null - and find_in_set(#{id}, replace(replace(replace(json_extract(t.process_definition_json, '$**.definitionId'), '[', ''),']', ''), ' ', '')) > 0 - + select tepd.id as work_flow_id,tepd.name as work_flow_name, + "" as source_work_flow_id, + tepd.release_state as work_flow_publish_status, + tes.start_time as schedule_start_time, + tes.end_time as schedule_end_time, + tes.crontab as crontab, + tes.release_state as schedule_publish_status + from t_ds_process_definition tepd + left join t_ds_schedules tes on tepd.id = tes.process_definition_id + where tepd.project_code = #{projectCode} and tepd.code = #{processDefinitionCode} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java index d9c2c7b115..909d71354c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java @@ -44,19 +44,4 @@ public class WorkFlowLineageMapperTest { List workFlowLineages = workFlowLineageMapper.queryByName("test",1L); Assert.assertNotEquals(workFlowLineages.size(), 0); } - - - @Test - public void testQueryByIds() { - Set ids = new HashSet<>(); - ids.add(1); - List workFlowLineages = workFlowLineageMapper.queryByIds(ids,1L); - Assert.assertNotEquals(workFlowLineages.size(), 0); - } - - @Test - public void testQuerySourceTarget() { - List workFlowRelations = workFlowLineageMapper.querySourceTarget(1); - Assert.assertNotEquals(workFlowRelations.size(), 0); - } }