Browse Source

[Feature][JsonSplit] fix process lineage bug (#5418)

* update taskParams/add task delayTime/fix conditionType bug

* update codeStyle for merge to dev

* fix process lineage bug

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
363426de71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 66
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
  2. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
  3. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
  4. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

66
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
@ -46,9 +46,6 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
@Autowired @Autowired
private WorkFlowLineageMapper workFlowLineageMapper; private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired @Autowired
private ProjectMapper projectMapper; private ProjectMapper projectMapper;
@ -66,48 +63,41 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
Set<WorkFlowRelation> workFlowRelations, Set<WorkFlowRelation> workFlowRelations,
ProcessLineage processLineage) { ProcessLineage processLineage) {
List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation( List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation(
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion() processLineage.getPostTaskCode(), processLineage.getPostTaskVersion(),
, processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
if (!relations.isEmpty()) {
for (ProcessLineage relation : relations) { Set<Integer> preWorkFlowIds = new HashSet<>();
if (relation.getProcessDefinitionCode() != null) { List<ProcessLineage> preRelations = workFlowLineageMapper.queryCodeRelation(
processLineage.getPreTaskCode(), processLineage.getPreTaskVersion(),
relation.setPreTaskCode(processLineage.getPostTaskCode()); processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
relation.setPreTaskVersion(processLineage.getPostTaskVersion()); for (ProcessLineage preRelation : preRelations) {
WorkFlowLineage pre = workFlowLineageMapper.queryWorkFlowLineageByCode(
WorkFlowLineage pre = workFlowLineageMapper preRelation.getProcessDefinitionCode(), preRelation.getProjectCode());
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); preWorkFlowIds.add(pre.getWorkFlowId());
// sourceWorkFlowId = ""
if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) {
workFlowLineageMap.put(pre.getWorkFlowId(), pre);
} }
ProcessLineage postRelation = relations.get(0);
WorkFlowLineage post = workFlowLineageMapper WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode(
.queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), relation.getProjectCode()); postRelation.getProcessDefinitionCode(), postRelation.getProjectCode());
if (!workFlowLineageMap.containsKey(post.getWorkFlowId())) {
if (workFlowLineageMap.containsKey(post.getWorkFlowId())) { post.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ","));
workFlowLineageMap.put(post.getWorkFlowId(), post);
} else {
WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId());
String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId();
if (sourceWorkFlowId.equals("")) { if (sourceWorkFlowId.equals("")) {
workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); workFlowLineage.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ","));
} else { } else {
workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + pre.getWorkFlowId()); if (!preWorkFlowIds.isEmpty()) {
workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + StringUtils.join(preWorkFlowIds, ","));
} }
} else {
post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
workFlowLineageMap.put(post.getWorkFlowId(), post);
} }
WorkFlowRelation workFlowRelation = new WorkFlowRelation();
workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId());
workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId());
if (workFlowRelations.contains(workFlowRelation)) {
continue;
} }
workFlowRelations.add(workFlowRelation); if (preWorkFlowIds.isEmpty()) {
getRelation(workFlowLineageMap, workFlowRelations, relation); workFlowRelations.add(new WorkFlowRelation(0, post.getWorkFlowId()));
} else {
for (Integer workFlowId : preWorkFlowIds) {
workFlowRelations.add(new WorkFlowRelation(workFlowId, post.getWorkFlowId()));
}
} }
} }
} }

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java

@ -38,6 +38,14 @@ public class WorkFlowRelation {
this.targetWorkFlowId = targetWorkFlowId; this.targetWorkFlowId = targetWorkFlowId;
} }
public WorkFlowRelation() {
}
public WorkFlowRelation(int sourceWorkFlowId, int targetWorkFlowId) {
this.sourceWorkFlowId = sourceWorkFlowId;
this.targetWorkFlowId = targetWorkFlowId;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
return obj instanceof WorkFlowRelation return obj instanceof WorkFlowRelation

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@ -59,7 +59,7 @@ public interface WorkFlowLineageMapper {
/** /**
* queryWorkFlowLineageByCode * queryWorkFlowLineageByCode
* *
* @param processDefinitionCode processDefinitioncode * @param processDefinitionCode processDefinitionCode
* @param projectCode projectCode * @param projectCode projectCode
* @return WorkFlowLineage * @return WorkFlowLineage
*/ */

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -50,19 +50,18 @@
</select> </select>
<select id="queryCodeRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage"> <select id="queryCodeRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select project_code select project_code,
post_task_code, post_task_code,
post_task_version, post_task_version,
pre_task_code, pre_task_code,
pre_task_version, pre_task_version,
process_definition_code, process_definition_code,
process_definition_version process_definition_version
from t_ds_process_task_relation ptr from t_ds_process_task_relation
where ptr.pre_task_code=#{taskCode} where post_task_code = #{taskCode}
and ptr.pre_task_version=#{taskVersion} and post_task_version = #{taskVersion}
and ptr.process_definition_code!=#{processDefinitionCode} and process_definition_code = #{processDefinitionCode}
and ptr.project_code =#{projectCode} and project_code = #{projectCode}
</select> </select>
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" <select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"

Loading…
Cancel
Save