Browse Source

[Fix-6156] [API] refactor workflow lineage api (#6157)

* fix mysql create sentence bug

* fix mysql create sentence bug

* fix genTaskCodeList return same code and save proces error

* refactor workflow lineage api

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
4ddfb855a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java
  3. 109
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
  4. 44
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java
  5. 31
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java
  6. 53
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java
  7. 28
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java
  8. 38
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java
  9. 38
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java
  10. 53
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml
  11. 29
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java

@ -27,10 +27,8 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,23 +52,23 @@ import springfox.documentation.annotations.ApiIgnore;
*/
@Api(tags = "WORK_FLOW_LINEAGE_TAG")
@RestController
@RequestMapping("lineages/{projectCode}")
@RequestMapping("projects/{projectCode}/lineages")
public class WorkFlowLineageController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(WorkFlowLineageController.class);
@Autowired
private WorkFlowLineageService workFlowLineageService;
@ApiOperation(value = "queryWorkFlowLineageByName", notes = "QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
@GetMapping(value = "/list-name")
@ApiOperation(value = "queryLineageByWorkFlowName", notes = "QUERY_WORKFLOW_LINEAGE_BY_NAME_NOTES")
@GetMapping(value = "/query-by-name")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<List<WorkFlowLineage>> queryWorkFlowLineageByName(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true, example = "1") @PathVariable long projectCode,
@ApiIgnore @RequestParam(value = "searchVal", required = false) String searchVal) {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "workFlowName", required = false) String workFlowName) {
try {
searchVal = ParameterUtils.handleEscapes(searchVal);
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal, projectCode);
workFlowName = ParameterUtils.handleEscapes(workFlowName);
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(projectCode, workFlowName);
return returnDataList(result);
} catch (Exception e) {
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
@ -78,24 +76,30 @@ public class WorkFlowLineageController extends BaseController {
}
}
@ApiOperation(value = "queryWorkFlowLineageByIds", notes = "QUERY_WORKFLOW_LINEAGE_BY_IDS_NOTES")
@GetMapping(value = "/list-ids")
@ApiOperation(value = "queryLineageByWorkFlowCode", notes = "QUERY_WORKFLOW_LINEAGE_BY_CODES_NOTES")
@GetMapping(value = "/{workFlowCode}")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true, example = "1") @PathVariable long projectCode,
@ApiIgnore @RequestParam(value = "ids", required = false) String ids) {
public Result<Map<String, Object>> queryWorkFlowLineageByCode(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "workFlowCode", required = true) long workFlowCode) {
try {
ids = ParameterUtils.handleEscapes(ids);
Set<Integer> idsSet = new HashSet<>();
if (ids != null) {
String[] idsStr = ids.split(",");
for (String id : idsStr) {
idsSet.add(Integer.parseInt(id));
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode);
return returnDataList(result);
} catch (Exception e) {
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);
return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
}
}
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(idsSet, projectCode);
@ApiOperation(value = "queryWorkFlowList", notes = "QUERY_WORKFLOW_LINEAGE_NOTES")
@GetMapping(value = "/list")
@ResponseStatus(HttpStatus.OK)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
try {
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(projectCode);
return returnDataList(result);
} catch (Exception e) {
logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(), e);

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java

@ -25,8 +25,9 @@ import java.util.Set;
*/
public interface WorkFlowLineageService {
Map<String, Object> queryWorkFlowLineageByName(String workFlowName, long projectCode);
Map<String, Object> queryWorkFlowLineageByName(long projectCode, String workFlowName);
Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, long projectCode);
Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long workFlowCode);
Map<String, Object> queryWorkFlowLineage(long projectCode);
}

109
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@ -50,65 +50,44 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
private ProjectMapper projectMapper;
@Override
public Map<String, Object> queryWorkFlowLineageByName(String workFlowName, long projectCode) {
Project project = projectMapper.queryByCode(projectCode);
public Map<String, Object> queryWorkFlowLineageByName(long projectCode, String workFlowName) {
Map<String, Object> result = new HashMap<>();
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, project.getCode());
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
return result;
}
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryWorkFlowLineageByName(projectCode, workFlowName);
result.put(Constants.DATA_LIST, workFlowLineageList);
putMsg(result, Status.SUCCESS);
return result;
}
private void getRelation(Map<Integer, WorkFlowLineage> workFlowLineageMap,
Set<WorkFlowRelation> workFlowRelations,
ProcessLineage processLineage) {
List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation(
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion(),
processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
if (!relations.isEmpty()) {
Set<Integer> preWorkFlowIds = new HashSet<>();
List<ProcessLineage> preRelations = workFlowLineageMapper.queryCodeRelation(
processLineage.getPreTaskCode(), processLineage.getPreTaskVersion(),
processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
for (ProcessLineage preRelation : preRelations) {
WorkFlowLineage pre = workFlowLineageMapper.queryWorkFlowLineageByCode(
preRelation.getProcessDefinitionCode(), preRelation.getProjectCode());
preWorkFlowIds.add(pre.getWorkFlowId());
}
ProcessLineage postRelation = relations.get(0);
WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode(
postRelation.getProcessDefinitionCode(), postRelation.getProjectCode());
if (!workFlowLineageMap.containsKey(post.getWorkFlowId())) {
post.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ","));
workFlowLineageMap.put(post.getWorkFlowId(), post);
} else {
WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId());
String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId();
if (sourceWorkFlowId.equals("")) {
workFlowLineage.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ","));
} else {
if (!preWorkFlowIds.isEmpty()) {
workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + StringUtils.join(preWorkFlowIds, ","));
}
}
}
if (preWorkFlowIds.isEmpty()) {
workFlowRelations.add(new WorkFlowRelation(0, post.getWorkFlowId()));
} else {
for (Integer workFlowId : preWorkFlowIds) {
workFlowRelations.add(new WorkFlowRelation(workFlowId, post.getWorkFlowId()));
}
}
@Override
public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long workFlowCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
return result;
}
WorkFlowLineage workFlowLineage = workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, workFlowCode);
result.put(Constants.DATA_LIST, workFlowLineage);
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, long projectCode) {
public Map<String, Object> queryWorkFlowLineage(long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
List<ProcessLineage> processLineages = workFlowLineageMapper.queryRelationByIds(ids, project.getCode());
if (project == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
return result;
}
List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineage(projectCode);
Map<Integer, WorkFlowLineage> workFlowLineages = new HashMap<>();
Map<Long, WorkFlowLineage> workFlowLineages = new HashMap<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
for (ProcessLineage processLineage : processLineages) {
@ -123,4 +102,42 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
return result;
}
private void getRelation(Map<Long, WorkFlowLineage> workFlowLineageMap,
Set<WorkFlowRelation> workFlowRelations,
ProcessLineage processLineage) {
List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode(), processLineage.getPostTaskCode(), processLineage.getPostTaskVersion());
if (!relations.isEmpty()) {
Set<Long> preWorkFlowCodes = new HashSet<>();
List<ProcessLineage> preRelations = workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(),
processLineage.getProcessDefinitionCode(), processLineage.getPreTaskCode(), processLineage.getPreTaskVersion());
for (ProcessLineage preRelation : preRelations) {
preWorkFlowCodes.add(preRelation.getProcessDefinitionCode());
}
ProcessLineage postRelation = relations.get(0);
WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode(postRelation.getProjectCode(), postRelation.getProcessDefinitionCode());
preWorkFlowCodes.remove(post.getWorkFlowCode());
if (!workFlowLineageMap.containsKey(post.getWorkFlowCode())) {
post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
workFlowLineageMap.put(post.getWorkFlowCode(), post);
} else {
WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowCode());
String sourceWorkFlowCode = workFlowLineage.getSourceWorkFlowCode();
if (StringUtils.isBlank(sourceWorkFlowCode)) {
post.setSourceWorkFlowCode(StringUtils.join(preWorkFlowCodes, ","));
} else {
if (!preWorkFlowCodes.isEmpty()) {
workFlowLineage.setSourceWorkFlowCode(sourceWorkFlowCode + "," + StringUtils.join(preWorkFlowCodes, ","));
}
}
}
if (preWorkFlowCodes.isEmpty()) {
workFlowRelations.add(new WorkFlowRelation(0L, post.getWorkFlowCode()));
} else {
for (long workFlowCode : preWorkFlowCodes) {
workFlowRelations.add(new WorkFlowRelation(workFlowCode, post.getWorkFlowCode()));
}
}
}
}
}

44
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java

@ -21,22 +21,27 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* work flow lineage controller test
*/
public class WorkFlowLineageControllerTest extends AbstractControllerTest {
@RunWith(MockitoJUnitRunner.class)
public class WorkFlowLineageControllerTest {
@InjectMocks
private WorkFlowLineageController workFlowLineageController;
@ -44,6 +49,26 @@ public class WorkFlowLineageControllerTest extends AbstractControllerTest {
@Mock
private WorkFlowLineageServiceImpl workFlowLineageService;
protected User user;
@Before
public void before() {
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setUserName("admin");
user = loginUser;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
@Test
public void testQueryWorkFlowLineageByName() {
long projectCode = 1L;
@ -51,23 +76,20 @@ public class WorkFlowLineageControllerTest extends AbstractControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(searchVal, projectCode)).thenReturn(result);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(projectCode, searchVal)).thenReturn(result);
Result response = workFlowLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@Test
public void testQueryWorkFlowLineageByIds() {
public void testQueryWorkFlowLineageByCode() {
long projectCode = 1L;
String ids = "1";
long code = 1L;
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Set<Integer> idSet = new HashSet<>();
idSet.add(1);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByIds(idSet, projectCode)).thenReturn(result);
Result response = workFlowLineageController.queryWorkFlowLineageByIds(user, projectCode, ids);
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(result);
Result response = workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
}

31
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java

@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -76,20 +75,16 @@ public class WorkFlowLineageServiceTest {
@Test
public void testQueryWorkFlowLineageByName() {
Project project = getProject("test");
String searchVal = "test";
String name = "test";
when(projectMapper.queryByCode(1L)).thenReturn(project);
when(workFlowLineageMapper.queryByName(Mockito.any(), Mockito.any())).thenReturn(getWorkFlowLineages());
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal, 1L);
when(workFlowLineageMapper.queryWorkFlowLineageByName(Mockito.anyLong(), Mockito.any())).thenReturn(getWorkFlowLineages());
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(1L, name);
List<WorkFlowLineage> workFlowLineageList = (List<WorkFlowLineage>) result.get(Constants.DATA_LIST);
Assert.assertTrue(workFlowLineageList.size() > 0);
}
@Test
public void testQueryWorkFlowLineageByIds() {
Set<Integer> ids = new HashSet<>();
ids.add(1);
ids.add(2);
public void testQueryWorkFlowLineage() {
Project project = getProject("test");
List<ProcessLineage> processLineages = new ArrayList<>();
@ -104,20 +99,16 @@ public class WorkFlowLineageServiceTest {
processLineages.add(processLineage);
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setSourceWorkFlowId("");
workFlowLineage.setSourceWorkFlowCode("");
when(projectMapper.queryByCode(1L)).thenReturn(project);
when(workFlowLineageMapper.queryRelationByIds(ids, project.getCode())).thenReturn(processLineages);
when(workFlowLineageMapper.queryCodeRelation(processLineage.getPostTaskCode()
, processLineage.getPreTaskVersion()
, processLineage.getProcessDefinitionCode()
, processLineage.getProjectCode()))
.thenReturn(processLineages);
when(workFlowLineageMapper
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()))
when(workFlowLineageMapper.queryProcessLineage(project.getCode())).thenReturn(processLineages);
when(workFlowLineageMapper.queryCodeRelation(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode(),
processLineage.getPostTaskCode(), processLineage.getPreTaskVersion())).thenReturn(processLineages);
when(workFlowLineageMapper.queryWorkFlowLineageByCode(processLineage.getProjectCode(), processLineage.getProcessDefinitionCode()))
.thenReturn(workFlowLineage);
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(ids, 1L);
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineage(1L);
Map<String, Object> workFlowLists = (Map<String, Object>) result.get(Constants.DATA_LIST);
Collection<WorkFlowLineage> workFlowLineages = (Collection<WorkFlowLineage>) workFlowLists.get(Constants.WORKFLOW_LIST);
@ -129,7 +120,7 @@ public class WorkFlowLineageServiceTest {
private List<WorkFlowLineage> getWorkFlowLineages() {
List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
WorkFlowLineage workFlowLineage = new WorkFlowLineage();
workFlowLineage.setWorkFlowId(1);
workFlowLineage.setWorkFlowCode(1);
workFlowLineage.setWorkFlowName("testdag");
workFlowLineages.add(workFlowLineage);
return workFlowLineages;

53
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessLineage.java

@ -25,12 +25,12 @@ public class ProcessLineage {
/**
* project code
*/
private Long projectCode;
private long projectCode;
/**
* post task code
*/
private Long postTaskCode;
private long postTaskCode;
/**
* post task version
@ -40,7 +40,7 @@ public class ProcessLineage {
/**
* pre task code
*/
private Long preTaskCode;
private long preTaskCode;
/**
* pre task version
@ -50,46 +50,42 @@ public class ProcessLineage {
/**
* process definition code
*/
private Long processDefinitionCode;
private long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
public Long getProjectCode() {
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(Long projectCode) {
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public Long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
public long getPostTaskCode() {
return postTaskCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
public void setPostTaskCode(long postTaskCode) {
this.postTaskCode = postTaskCode;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
public int getPostTaskVersion() {
return postTaskVersion;
}
public void setPostTaskCode(Long postTaskCode) {
this.postTaskCode = postTaskCode;
public void setPostTaskVersion(int postTaskVersion) {
this.postTaskVersion = postTaskVersion;
}
public Long getPreTaskCode() {
public long getPreTaskCode() {
return preTaskCode;
}
public void setPreTaskCode(Long preTaskCode) {
public void setPreTaskCode(long preTaskCode) {
this.preTaskCode = preTaskCode;
}
@ -101,20 +97,19 @@ public class ProcessLineage {
this.preTaskVersion = preTaskVersion;
}
public int getPostTaskVersion() {
return postTaskVersion;
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setPostTaskVersion(int postTaskVersion) {
this.postTaskVersion = postTaskVersion;
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public long getPostTaskCode() {
return postTaskCode;
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setPostTaskCode(long postTaskCode) {
this.postTaskCode = postTaskCode;
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
}

28
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowLineage.java

@ -19,29 +19,21 @@ package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
public class WorkFlowLineage {
private int workFlowId;
private long workFlowCode;
private String workFlowName;
private String workFlowPublishStatus;
private Date scheduleStartTime;
private Date scheduleEndTime;
private String crontab;
private int schedulePublishStatus;
private String sourceWorkFlowId;
private String sourceWorkFlowCode;
public String getSourceWorkFlowId() {
return sourceWorkFlowId;
public long getWorkFlowCode() {
return workFlowCode;
}
public void setSourceWorkFlowId(String sourceWorkFlowId) {
this.sourceWorkFlowId = sourceWorkFlowId;
}
public int getWorkFlowId() {
return workFlowId;
}
public void setWorkFlowId(int workFlowId) {
this.workFlowId = workFlowId;
public void setWorkFlowCode(long workFlowCode) {
this.workFlowCode = workFlowCode;
}
public String getWorkFlowName() {
@ -91,4 +83,12 @@ public class WorkFlowLineage {
public void setSchedulePublishStatus(int schedulePublishStatus) {
this.schedulePublishStatus = schedulePublishStatus;
}
public String getSourceWorkFlowCode() {
return sourceWorkFlowCode;
}
public void setSourceWorkFlowCode(String sourceWorkFlowCode) {
this.sourceWorkFlowCode = sourceWorkFlowCode;
}
}

38
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java

@ -19,42 +19,30 @@ package org.apache.dolphinscheduler.dao.entity;
import java.util.Objects;
public class WorkFlowRelation {
private int sourceWorkFlowId;
private int targetWorkFlowId;
private long sourceWorkFlowCode;
private long targetWorkFlowCode;
public int getSourceWorkFlowId() {
return sourceWorkFlowId;
public long getSourceWorkFlowCode() {
return sourceWorkFlowCode;
}
public void setSourceWorkFlowId(int sourceWorkFlowId) {
this.sourceWorkFlowId = sourceWorkFlowId;
public void setSourceWorkFlowCode(long sourceWorkFlowCode) {
this.sourceWorkFlowCode = sourceWorkFlowCode;
}
public int getTargetWorkFlowId() {
return targetWorkFlowId;
public long getTargetWorkFlowCode() {
return targetWorkFlowCode;
}
public void setTargetWorkFlowId(int targetWorkFlowId) {
this.targetWorkFlowId = targetWorkFlowId;
public void setTargetWorkFlowCode(long targetWorkFlowCode) {
this.targetWorkFlowCode = targetWorkFlowCode;
}
public WorkFlowRelation() {
}
public WorkFlowRelation(int sourceWorkFlowId, int targetWorkFlowId) {
this.sourceWorkFlowId = sourceWorkFlowId;
this.targetWorkFlowId = targetWorkFlowId;
}
@Override
public boolean equals(Object obj) {
return obj instanceof WorkFlowRelation
&& this.sourceWorkFlowId == ((WorkFlowRelation) obj).getSourceWorkFlowId()
&& this.targetWorkFlowId == ((WorkFlowRelation) obj).getTargetWorkFlowId();
}
@Override
public int hashCode() {
return Objects.hash(sourceWorkFlowId, targetWorkFlowId);
public WorkFlowRelation(long sourceWorkFlowCode, long targetWorkFlowCode) {
this.sourceWorkFlowCode = sourceWorkFlowCode;
this.targetWorkFlowCode = targetWorkFlowCode;
}
}

38
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java

@ -22,47 +22,45 @@ import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Set;
public interface WorkFlowLineageMapper {
/**
* queryByName
*
* @param searchVal searchVal
* @param projectCode projectCode
* @param workFlowName workFlowName
* @return WorkFlowLineage list
*/
List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal, @Param("projectCode") Long projectCode);
List<WorkFlowLineage> queryWorkFlowLineageByName(@Param("projectCode") long projectCode, @Param("workFlowName") String workFlowName);
/**
* queryCodeRelation
* queryWorkFlowLineageByCode
*
* @param taskCode taskCode
* @param taskVersion taskVersion
* @param processDefinitionCode processDefinitionCode
* @return ProcessLineage
* @param projectCode projectCode
* @param workFlowCode workFlowCode
* @return WorkFlowLineage
*/
List<ProcessLineage> queryCodeRelation(
@Param("taskCode") Long taskCode, @Param("taskVersion") int taskVersion,
@Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode);
WorkFlowLineage queryWorkFlowLineageByCode(@Param("projectCode") long projectCode, @Param("workFlowCode") long workFlowCode);
/**
* queryRelationByIds
* queryProcessLineage
*
* @param ids ids
* @param projectCode projectCode
* @return ProcessLineage
* @return ProcessLineage list
*/
List<ProcessLineage> queryRelationByIds(@Param("ids") Set<Integer> ids, @Param("projectCode") Long projectCode);
List<ProcessLineage> queryProcessLineage(@Param("projectCode") long projectCode);
/**
* queryWorkFlowLineageByCode
* queryCodeRelation
*
* @param taskCode taskCode
* @param taskVersion taskVersion
* @param processDefinitionCode processDefinitionCode
* @param projectCode projectCode
* @return WorkFlowLineage
* @return ProcessLineage list
*/
WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode);
List<ProcessLineage> queryCodeRelation(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode,
@Param("taskVersion") int taskVersion);
}

53
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -19,17 +19,30 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.id as work_flow_id,tepd.name as work_flow_name
<select id="queryWorkFlowLineageByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,tepd.name as work_flow_name
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode}
<if test="searchVal != null and searchVal != ''">
and tepd.name like concat('%', #{searchVal}, '%')
<if test="workFlowName != null and workFlowName != ''">
and tepd.name like concat('%', #{workFlowName}, '%')
</if>
</select>
<select id="queryRelationByIds" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.code as work_flow_code,tepd.name as work_flow_name,
"" as source_work_flow_code,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode} and tepd.code = #{workFlowCode}
</select>
<select id="queryProcessLineage" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select ptr.project_code,
ptr.post_task_code,
ptr.post_task_version,
@ -38,15 +51,9 @@
ptr.process_definition_code,
ptr.process_definition_version
from t_ds_process_definition pd
join t_ds_process_task_relation ptr on pd.code = ptr.process_definition_code and pd.version =
ptr.process_definition_version
join t_ds_process_task_relation ptr on pd.code = ptr.process_definition_code
and pd.version = ptr.process_definition_version
where pd.project_code = #{projectCode}
<if test="ids != null and ids.size()>0">
and pd.id in
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="queryCodeRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
@ -58,23 +65,9 @@
process_definition_code,
process_definition_version
from t_ds_process_task_relation
where post_task_code = #{taskCode}
and post_task_version = #{taskVersion}
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
and project_code = #{projectCode}
</select>
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.id as work_flow_id,tepd.name as work_flow_name,
"" as source_work_flow_id,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.code = tes.process_definition_code
where tepd.project_code = #{projectCode} and tepd.code = #{processDefinitionCode}
and post_task_code = #{taskCode}
and post_task_version = #{taskVersion}
</select>
</mapper>

29
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java

@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import org.junit.Assert;
@ -81,9 +80,8 @@ public class WorkFlowLineageMapperTest {
/**
* insert
*
* @return ProcessDefinition
*/
private ProcessDefinition insertOneProcessDefinition() {
private void insertOneProcessDefinition() {
//insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
@ -93,15 +91,13 @@ public class WorkFlowLineageMapperTest {
processDefinition.setUpdateTime(new Date());
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
return processDefinition;
}
/**
* insert
*
* @return Schedule
*/
private Schedule insertOneSchedule(int id) {
private void insertOneSchedule(int id) {
//insertOne
Schedule schedule = new Schedule();
schedule.setStartTime(new Date());
@ -114,38 +110,32 @@ public class WorkFlowLineageMapperTest {
schedule.setUpdateTime(new Date());
schedule.setProcessDefinitionCode(id);
scheduleMapper.insert(schedule);
return schedule;
}
@Test
public void testQueryByName() {
public void testQueryWorkFlowLineageByName() {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryByName(processDefinition.getName(), processDefinition.getProjectCode());
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByName(processDefinition.getProjectCode(), processDefinition.getName());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
public void testQueryCodeRelation() {
ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation();
List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(
processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion(),
processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getProjectCode());
List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getProjectCode(),
processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
public void testQueryRelationByIds() {
public void testQueryWorkFlowLineage() {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneProcessTaskRelation();
HashSet<Integer> set = new HashSet<>();
set.add(processDefinition.getId());
List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryRelationByIds(set, processDefinition.getProjectCode());
List<ProcessLineage> workFlowLineages = workFlowLineageMapper.queryProcessLineage(processDefinition.getProjectCode());
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@ -154,8 +144,7 @@ public class WorkFlowLineageMapperTest {
insertOneProcessDefinition();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(1L);
insertOneSchedule(processDefinition.getId());
WorkFlowLineage workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByCode(processDefinition.getCode(), processDefinition.getProjectCode());
WorkFlowLineage workFlowLineages = workFlowLineageMapper.queryWorkFlowLineageByCode(processDefinition.getProjectCode(), processDefinition.getCode());
Assert.assertNotNull(workFlowLineages);
}

Loading…
Cancel
Save