diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index c37b141671..d665a1a184 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -20,20 +20,32 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; +import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.lang.StringUtils; +import org.apache.curator.shaded.com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -50,6 +62,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF @Autowired private ProjectMapper projectMapper; + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + @Override public Map queryWorkFlowLineageByName(long projectCode, String workFlowName) { Map result = new HashMap<>(); @@ -72,12 +87,47 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode); return result; } - WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode); - result.put(Constants.DATA_LIST, workFlowLineage); + Map workFlowLineagesMap = new HashMap<>(); + Set workFlowRelations = new HashSet<>(); + Set sourceWorkFlowCodes = Sets.newHashSet(workFlowCode); + recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes); + Map workFlowLists = new HashMap<>(); + workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values()); + workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); + result.put(Constants.DATA_LIST, workFlowLists); putMsg(result, Status.SUCCESS); return result; } + private void recursiveWorkFlow(long projectCode, + Map workFlowLineagesMap, + Set workFlowRelations, + Set sourceWorkFlowCodes) { + for (Long workFlowCode : sourceWorkFlowCodes) { + WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode); + workFlowLineagesMap.put(workFlowCode, workFlowLineage); + List processLineages = workFlowLineageMapper.queryProcessLineageByCode(projectCode, workFlowCode); + List taskDefinitionList = new ArrayList<>(); + for (ProcessLineage processLineage : processLineages) { + if (processLineage.getPreTaskCode() > 0) { + taskDefinitionList.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion())); + } + if (processLineage.getPostTaskCode() > 0) { + taskDefinitionList.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion())); + } + } + sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlowCode, taskDefinitionList); + if (sourceWorkFlowCodes.isEmpty()) { + workFlowRelations.add(new WorkFlowRelation(0L, workFlowCode)); + return; + } else { + workFlowLineagesMap.get(workFlowCode).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA)); + sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlowCode))); + recursiveWorkFlow(projectCode, workFlowLineagesMap, workFlowRelations, sourceWorkFlowCodes); + } + } + } + @Override public Map queryWorkFlowLineage(long projectCode) { Map result = new HashMap<>(); @@ -87,58 +137,65 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF return result; } List processLineages = workFlowLineageMapper.queryProcessLineage(projectCode); - - Map workFlowLineages = new HashMap<>(); + Map workFlowLineagesMap = new HashMap<>(); Set workFlowRelations = new HashSet<>(); - - for (ProcessLineage processLineage : processLineages) { - getRelation(workFlowLineages, workFlowRelations, processLineage); + if (!processLineages.isEmpty()) { + List workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages); + workFlowLineagesMap = workFlowLineages.stream().collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode, workFlowLineage -> workFlowLineage)); + Map> workFlowMap = new HashMap<>(); + for (ProcessLineage processLineage : processLineages) { + workFlowMap.compute(processLineage.getProcessDefinitionCode(), (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + if (processLineage.getPreTaskCode() > 0) { + v.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion())); + } + if (processLineage.getPostTaskCode() > 0) { + v.add(new TaskDefinition(processLineage.getPostTaskCode(), processLineage.getPostTaskVersion())); + } + return v; + }); + } + for (Entry> workFlow : workFlowMap.entrySet()) { + Set sourceWorkFlowCodes = querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue()); + if (sourceWorkFlowCodes.isEmpty()) { + workFlowRelations.add(new WorkFlowRelation(0L, workFlow.getKey())); + } else { + workFlowLineagesMap.get(workFlow.getKey()).setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA)); + sourceWorkFlowCodes.forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey()))); + } + } } - Map workFlowLists = new HashMap<>(); - workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values()); + workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values()); workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); result.put(Constants.DATA_LIST, workFlowLists); putMsg(result, Status.SUCCESS); return result; } - private void getRelation(Map workFlowLineageMap, - Set workFlowRelations, - ProcessLineage processLineage) { - List relations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), - processLineage.getProcessDefinitionCode(), processLineage.getPostTaskCode(), processLineage.getPostTaskVersion()); - if (!relations.isEmpty()) { - Set preWorkFlowCodes = new HashSet<>(); - List preRelations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), - processLineage.getProcessDefinitionCode(), processLineage.getPreTaskCode(), processLineage.getPreTaskVersion()); - for (ProcessLineage preRelation : preRelations) { - preWorkFlowCodes.add(preRelation.getProcessDefinitionCode()); - } - ProcessLineage postRelation = relations.get(0); - WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(), postRelation.getProcessDefinitionCode()); - preWorkFlowCodes.remove(post.getWorkFlowCode()); - if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) { - post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ",")); - workFlowLineageMap.put(post.getWorkFlowCode(), post); - } else { - WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowCode()); - String sourceWorkFlowCode = workFlowLineage.getSourceWorkFlowCode(); - if (StringUtils.isBlank(sourceWorkFlowCode)) { - post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ",")); - } else { - if (!preWorkFlowCodes.isEmpty()) { - workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," + StringUtils.join(preWorkFlowCodes, ",")); + private Set querySourceWorkFlowCodes(long projectCode, long workFlowCode, List taskDefinitionList) { + Set sourceWorkFlowCodes = new HashSet<>(); + List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList); + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (taskDefinitionLog.getProjectCode() == projectCode) { + if (taskDefinitionLog.getTaskType().equals(TaskType.DEPENDENT.getDesc())) { + DependentParameters dependentParameters = JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class); + if (dependentParameters != null) { + List dependTaskList = dependentParameters.getDependTaskList(); + for (DependentTaskModel taskModel : dependTaskList) { + List dependItemList = taskModel.getDependItemList(); + for (DependentItem dependentItem : dependItemList) { + if (dependentItem.getProjectCode() == projectCode && dependentItem.getDefinitionCode() != workFlowCode) { + sourceWorkFlowCodes.add(dependentItem.getDefinitionCode()); + } + } + } } } } - if (preWorkFlowCodes.isEmpty()) { - workFlowRelations.add(new WorkFlowRelation(0L, post.getWorkFlowCode())); - } else { - for (long workFlowCode : preWorkFlowCodes) { - workFlowRelations.add(new WorkFlowRelation(workFlowCode, post.getWorkFlowCode())); - } - } } + return sourceWorkFlowCodes; } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java index 5bbe2ace0a..fc7eee3a5f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import java.util.ArrayList; @@ -57,6 +58,9 @@ public class WorkFlowLineageServiceTest { @Mock private ProjectMapper projectMapper; + @Mock + private TaskDefinitionLogMapper taskDefinitionLogMapper; + /** * get mock Project * @@ -97,23 +101,22 @@ public class WorkFlowLineageServiceTest { processLineage.setProcessDefinitionVersion(1); processLineage.setProjectCode(1111L); processLineages.add(processLineage); - WorkFlowLineage workFlowLineage = new WorkFlowLineage(); workFlowLineage.setSourceWorkFlowCode(""); + workFlowLineage.setWorkFlowCode(1111L); + List workFlowLineages = new ArrayList<>(); + workFlowLineages.add(workFlowLineage); when(projectMapper.queryByCode(1L)).thenReturn(project); when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages); - when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode(), - processLineage.getPostTaskCode(), processLineage.getPreTaskVersion())).thenReturn(processLineages); - when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode())) - .thenReturn(workFlowLineage); + when(workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages)).thenReturn(workFlowLineages); Map result = workFlowLineageService.queryWorkFlowLineage(1L); Map workFlowLists = (Map) result.get(Constants.DATA_LIST); - Collection workFlowLineages = (Collection) workFlowLists.get(Constants.WORKFLOW_LIST); + Collection workFlowLineageList = (Collection) workFlowLists.get(Constants.WORKFLOW_LIST); Set workFlowRelations = (Set) workFlowLists.get(Constants.WORKFLOW_RELATION_LIST); - Assert.assertTrue(workFlowLineages.size() > 0); + Assert.assertTrue(workFlowLineageList.size() > 0); Assert.assertTrue(workFlowRelations.size() > 0); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java index 7d6f7d3df1..196ac0bb55 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.model; import org.apache.dolphinscheduler.common.enums.DependResult; @@ -23,16 +24,15 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; * dependent item */ public class DependentItem { - - private Long definitionCode; + private long projectCode; + private long definitionCode; private String depTasks; private String cycle; private String dateValue; private DependResult dependResult; private ExecutionStatus status; - - public String getKey(){ + public String getKey() { return String.format("%d-%s-%s-%s", getDefinitionCode(), getDepTasks(), @@ -40,11 +40,19 @@ public class DependentItem { getDateValue()); } - public Long getDefinitionCode() { + public long getProjectCode() { + return projectCode; + } + + public void setProjectCode(long projectCode) { + this.projectCode = projectCode; + } + + public long getDefinitionCode() { return definitionCode; } - public void setDefinitionCode(Long definitionCode) { + public void setDefinitionCode(long definitionCode) { this.definitionCode = definitionCode; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java index 5b4d7d94f3..f89992ec13 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; import java.util.Objects; @@ -45,4 +46,22 @@ public class WorkFlowRelation { this.sourceWorkFlowCode = sourceWorkFlowCode; this.targetWorkFlowCode = targetWorkFlowCode; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkFlowRelation that = (WorkFlowRelation) o; + return sourceWorkFlowCode == that.sourceWorkFlowCode + && targetWorkFlowCode == that.targetWorkFlowCode; + } + + @Override + public int hashCode() { + return Objects.hash(sourceWorkFlowCode, targetWorkFlowCode); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index d4c7838b4e..249e42afec 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; @@ -43,6 +44,14 @@ public interface WorkFlowLineageMapper { */ WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, @Param("workFlowCode") long workFlowCode); + /** + * queryWorkFlowLineageByCode + * + * @param processLineages processLineages + * @return WorkFlowLineage list + */ + List queryWorkFlowLineageByLineage(@Param("processLineages") List processLineages); + /** * queryProcessLineage * @@ -54,13 +63,10 @@ public interface WorkFlowLineageMapper { /** * queryCodeRelation * - * @param taskCode taskCode - * @param taskVersion taskVersion + * @param projectCode projectCode * @param processDefinitionCode processDefinitionCode * @return ProcessLineage list */ - List queryCodeRelation(@Param("projectCode") long projectCode, - @Param("processDefinitionCode") long processDefinitionCode, - @Param("taskCode") long taskCode, - @Param("taskVersion") int taskVersion); + List queryProcessLineageByCode(@Param("projectCode") long projectCode, + @Param("processDefinitionCode") long processDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index 366c4a60a1..638ac5d481 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -42,6 +42,26 @@ where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode} + + - select project_code, post_task_code, post_task_version, @@ -67,7 +87,5 @@ from t_ds_process_task_relation where project_code = #{projectCode} and process_definition_code = #{processDefinitionCode} - and post_task_code = #{taskCode} - and post_task_version = #{taskVersion} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java index da1a617eca..fc6b090b0b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java @@ -125,8 +125,8 @@ public class WorkFlowLineageMapperTest { @Test public void testQueryCodeRelation() { ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation(); - List workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(), - processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); + List workFlowLineages = workFlowLineageMapper.queryProcessLineageByCode(processTaskRelation.getProjectCode(), + processTaskRelation.getProcessDefinitionCode()); Assert.assertNotEquals(workFlowLineages.size(), 0); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 2abce01923..5805b2d945 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.DateInterval; -import org.apache.dolphinscheduler.common.model.PreviousTaskNode; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag;