diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 3d6e4a25c7..9a476c8e8b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -50,6 +51,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.google.common.collect.Lists; + /** * process task relation service impl */ @@ -202,29 +205,72 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P /** * delete task upstream relation * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param preTaskCodes the pre task codes, sep ',' - * @param taskCode the post task code + * @param taskCode the post task code * @return delete result code */ @Override public Map deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) { - return null; + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + if (StringUtils.isEmpty(preTaskCodes)) { + putMsg(result,Status.DATA_IS_NULL,"preTaskCodes"); + return result; + } + Set preTaskCodesSet = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); + Status status = deleteUpstreamRelation(projectCode, preTaskCodesSet.toArray(new Long[0]), taskCode); + if (status != Status.SUCCESS) { + putMsg(result, status); + } + return result; } /** * delete task downstream relation * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param postTaskCodes the post task codes, sep ',' - * @param taskCode the pre task code + * @param taskCode the pre task code * @return delete result code */ @Override public Map deleteDownstreamRelation(User loginUser, long projectCode, String postTaskCodes, long taskCode) { - return null; + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + if (StringUtils.isEmpty(postTaskCodes)) { + putMsg(result,Status.DATA_IS_NULL,"postTaskCodes"); + return result; + } + Set postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); + List deleteFailedCodeList = new ArrayList<>(); + postTaskCodesSet.stream().forEach( + postTaskCode -> { + try { + Status status = deleteUpstreamRelation(projectCode, new Long[]{taskCode}, postTaskCode); + if (Status.SUCCESS != status) { + deleteFailedCodeList.add(postTaskCode); + } + } catch (Exception e) { + deleteFailedCodeList.add(postTaskCode); + } + + } + ); + if (!deleteFailedCodeList.isEmpty()) { + putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR, String.join(",", deleteFailedCodeList.stream().map(o -> o + "").collect(Collectors.toList()))); + } + return result; } /** @@ -328,4 +374,61 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P }; } + /** + * delete upstream relation + * + * @param projectCode project code + * @param preTaskCodes pre task codes + * @param taskCode pre task code + * @return status + */ + private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) { + List upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes); + if (CollectionUtils.isEmpty(upstreamList)) { + return Status.SUCCESS; + } + Map> processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream() + .collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode)); + // count upstream relation group by process definition code + List> countListGroupByProcessDefinitionCode = processTaskRelationMapper + .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode); + + List deletes = new ArrayList<>(); + List updates = new ArrayList<>(); + + countListGroupByProcessDefinitionCode.stream().forEach( + processDefinitionCodeUpstreamCountMap -> + processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach( + o -> { + Long processDefinitionCode = o.getKey(); + Integer count = o.getValue(); + List processTaskRelationList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); + if (count <= processTaskRelationList.size()) { + ProcessTaskRelation processTaskRelation = processTaskRelationList.remove(0); + if (processTaskRelation.getPreTaskCode() != 0) { + processTaskRelation.setPreTaskCode(0); + processTaskRelation.setPreTaskVersion(0); + updates.add(processTaskRelation); + } + } + if (!processTaskRelationList.isEmpty()) { + deletes.addAll(processTaskRelationList); + } + } + ) + ); + + int update = 0; + if (!updates.isEmpty()) { + update = processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates); + } + int delete = 0; + if (!deletes.isEmpty()) { + delete = processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList())); + } + if (update < 0 || delete < 0) { + return Status.DELETE_TASK_PROCESS_RELATION_ERROR; + } + return Status.SUCCESS; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 758eceb4e4..55efbfe088 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -186,15 +186,18 @@ public class ProcessTaskRelationServiceTest { processTaskRelationUpstream0.setPreTaskVersion(1); processTaskRelationUpstream0.setProjectCode(projectCode); processTaskRelationUpstream0.setPreTaskCode(123); + processTaskRelationUpstream0.setProcessDefinitionCode(123); ProcessTaskRelation processTaskRelationUpstream1 = new ProcessTaskRelation(); processTaskRelationUpstream1.setPostTaskCode(taskCode); processTaskRelationUpstream1.setPreTaskVersion(1); processTaskRelationUpstream1.setPreTaskCode(123); + processTaskRelationUpstream0.setProcessDefinitionCode(124); processTaskRelationUpstream1.setProjectCode(projectCode); ProcessTaskRelation processTaskRelationUpstream2 = new ProcessTaskRelation(); processTaskRelationUpstream2.setPostTaskCode(taskCode); processTaskRelationUpstream2.setPreTaskVersion(2); processTaskRelationUpstream1.setPreTaskCode(123); + processTaskRelationUpstream0.setProcessDefinitionCode(125); processTaskRelationUpstream2.setProjectCode(projectCode); List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelationUpstream0); @@ -382,4 +385,80 @@ public class ProcessTaskRelationServiceTest { Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); Assert.assertEquals(2, ((List) relation.get("data")).size()); } + + @Test + public void testDeleteDownstreamRelation() { + long projectCode = 1L; + long taskCode = 2L; + Project project = getProject(projectCode); + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode); + Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList); + List> countListGroupByProcessDefinitionCode = new ArrayList<>(); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(123L, 2); + } + }); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(124L, 1); + } + }); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(125L, 3); + } + }); + Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode); + Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3); + Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3); + Map result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode); + Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); + } + + @Test + public void testDeleteUpstreamRelation() { + long projectCode = 1L; + long taskCode = 2L; + Project project = getProject(projectCode); + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode); + Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList); + List> countListGroupByProcessDefinitionCode = new ArrayList<>(); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(123L, 2); + } + }); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(124L, 1); + } + }); + countListGroupByProcessDefinitionCode.add(new HashMap() { + { + put(125L, 3); + } + }); + Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode); + Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3); + Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3); + Map result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode); + Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 6d4c79be61..a03f8f0b08 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.ibatis.annotations.Param; import java.util.List; +import java.util.Map; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -101,6 +102,45 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); + /** + * query task relation by codes + * + * @param projectCode projectCode + * @param taskCode taskCode + * @param postTaskCodes postTaskCodes list + * @return ProcessTaskRelation + */ + List queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes); + + /** + * query task relation by codes + * + * @param projectCode projectCode + * @param taskCode taskCode + * @param preTaskCodes preTaskCode list + * @return ProcessTaskRelation + */ + List queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] preTaskCodes); + + /** + * count upstream by codes + * + * @param projectCode projectCode + * @param taskCode taskCode + * @param processDefinitionCodes processDefinitionCodes + * @return upstream count list group by process definition code + */ + List> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, + @Param("processDefinitionCodes") Long[] processDefinitionCodes, @Param("taskCode") long taskCode); + + /** + * batch update process task relation pre task + * + * @param processTaskRelationList process task relation list + * @return update num + */ + int batchUpdateProcessTaskRelationPreTask(@Param("processTaskRelationList") List processTaskRelationList); + /** * query by code * diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 11602eba4c..95a6550c0c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -92,6 +92,62 @@ and post_task_code = #{taskCode} + + + + + + + + + update t_ds_process_task_relation + + pre_task_code=#{processTaskRelation.preTaskCode}, + pre_task_version=#{processTaskRelation.preTaskVersion} + + + WHERE id = #{processTaskRelation.id} + + + +