diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java index 5c1e3f427b..299e43d235 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_TASK_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_EDGE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_PROCESS_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.MOVE_PROCESS_TASK_RELATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_PROCESS_RELATION_ERROR; @@ -253,4 +254,35 @@ public class ProcessTaskRelationController extends BaseController { @PathVariable("taskCode") long taskCode) { return returnDataList(processTaskRelationService.queryDownstreamRelation(loginUser, projectCode, taskCode)); } + + /** + * delete edge + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param preTaskCode pre task code + * @param postTaskCode post task code + * @return delete result code + */ + @ApiOperation(value = "deleteEdge", notes = "DELETE_EDGE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "preTaskCode", value = "PRE_TASK_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "postTaskCode", value = "POST_TASK_CODE", required = true, type = "Long") + }) + @DeleteMapping(value = "/{processDefinitionCode}/{preTaskCode}/{postTaskCode}") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_EDGE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteEdge(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) + @PathVariable long projectCode, + @PathVariable long processDefinitionCode, + @PathVariable long preTaskCode, + @PathVariable long postTaskCode) { + return returnDataList(processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode)); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 4fbec8efbd..dae4028177 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -288,6 +288,7 @@ public enum Status { TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"), MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"), PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"), + DELETE_EDGE_ERROR(50054, "delete edge error", "删除工作流任务连接线错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index 37099d5e21..4246823fe0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -123,4 +123,16 @@ public interface ProcessTaskRelationService { Map queryDownstreamRelation(User loginUser, long projectCode, long taskCode); + + /** + * delete edge + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param preTaskCode pre task code + * @param postTaskCode post task code + * @return delete result code + */ + Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index d4e79bf350..5f1d0a39e4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -388,6 +388,57 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } + /** + * delete edge + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param preTaskCode pre task code + * @param postTaskCode post task code + * @return delete result code + */ + @Override + public Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + List processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode); + if (CollectionUtils.isEmpty(processTaskRelationList)) { + putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); + return result; + } + if (processTaskRelationList.size() > 1) { + putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList"); + return result; + } + ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0); + int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(), + 0L, processTaskRelation.getPostTaskCode()); + + if (upstreamCount == 0) { + putMsg(result, Status.DATA_IS_NULL, "upstreamCount"); + return result; + } + if (upstreamCount > 1) { + int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId()); + if (delete == 0) { + putMsg(result, Status.DELETE_EDGE_ERROR); + } + return result; + } + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPreTaskCode(0L); + int update = processTaskRelationMapper.updateById(processTaskRelation); + if (update == 0) { + putMsg(result, Status.DELETE_EDGE_ERROR); + } + return result; + } + /** * build task definition * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 2b7b5a7fe9..3434fb3e0c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -492,4 +492,34 @@ public class ProcessTaskRelationServiceTest { result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } + + @Test + public void testDeleteEdge() { + long projectCode = 1L; + long processDefinitionCode = 3L; + long preTaskCode = 4L; + long postTaskCode = 5L; + Project project = getProject(projectCode); + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(processDefinitionCode); + processTaskRelation.setPreTaskCode(preTaskCode); + processTaskRelation.setPostTaskCode(postTaskCode); + List processTaskRelationList = new ArrayList<>(); + processTaskRelationList.add(processTaskRelation); + Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1); + Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1); + Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); + result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index a03f8f0b08..c0aa1c7b6c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -162,4 +162,18 @@ public interface ProcessTaskRelationMapper extends BaseMapper + +