From a24a81685aaddbf86ad6388e9f2e31b5ce404b14 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 25 Jan 2022 20:17:33 +0800 Subject: [PATCH] fix relation delete (#8190) --- .../ProcessTaskRelationController.java | 44 +- .../service/ProcessTaskRelationService.java | 16 - .../impl/ProcessTaskRelationServiceImpl.java | 382 +++++------------- .../impl/TaskDefinitionServiceImpl.java | 4 +- .../ProcessTaskRelationServiceTest.java | 131 ++---- 5 files changed, 149 insertions(+), 428 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java index 54887b3f9e..710c8ac209 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java @@ -101,46 +101,6 @@ public class ProcessTaskRelationController extends BaseController { return returnDataList(result); } - /** - * move task to other processDefinition - * - * @param loginUser login user info - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param targetProcessDefinitionCode target process definition code - * @param taskCode the current task code (the post task code) - * @return move result code - */ - @ApiOperation(value = "moveRelation", notes = "MOVE_TASK_TO_OTHER_PROCESS_DEFINITION_NOTES") - @ApiImplicitParams({ - @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), - @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "Long"), - @ApiImplicitParam(name = "targetProcessDefinitionCode", value = "TARGET_PROCESS_DEFINITION_CODE", required = true, type = "Long"), - @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long") - }) - @PostMapping(value = "/move") - @ResponseStatus(HttpStatus.OK) - @ApiException(MOVE_PROCESS_TASK_RELATION_ERROR) - @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result moveTaskProcessRelation(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode, - @RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode, - @RequestParam(name = "taskCode", required = true) long taskCode) { - Map result = new HashMap<>(); - if (processDefinitionCode == 0L) { - putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode"); - } else if (targetProcessDefinitionCode == 0L) { - putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); - } else if (taskCode == 0L) { - putMsg(result, DATA_IS_NOT_VALID, "taskCode"); - } else { - result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode, - targetProcessDefinitionCode, taskCode); - } - return returnDataList(result); - } - /** * delete process task relation (delete task from workflow) * @@ -179,7 +139,7 @@ public class ProcessTaskRelationController extends BaseController { @ApiOperation(value = "deleteUpstreamRelation", notes = "DELETE_UPSTREAM_RELATION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), - @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "3,4"), + @ApiImplicitParam(name = "preTaskCodes", value = "PRE_TASK_CODES", required = true, type = "String", example = "1,2"), @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long") }) @DeleteMapping(value = "/{taskCode}/upstream") @@ -205,7 +165,7 @@ public class ProcessTaskRelationController extends BaseController { @ApiOperation(value = "deleteDownstreamRelation", notes = "DELETE_DOWNSTREAM_RELATION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), - @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "3,4"), + @ApiImplicitParam(name = "postTaskCodes", value = "POST_TASK_CODES", required = true, type = "String", example = "1,2"), @ApiImplicitParam(name = "taskCode", value = "TASK_CODE", required = true, type = "Long") }) @DeleteMapping(value = "/{taskCode}/downstream") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index 4246823fe0..f31cc8be48 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -42,22 +42,6 @@ public interface ProcessTaskRelationService { long preTaskCode, long postTaskCode); - /** - * move task to other processDefinition - * - * @param loginUser login user info - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param targetProcessDefinitionCode target process definition code - * @param taskCode the current task code (the post task code) - * @return move result code - */ - Map moveTaskProcessRelation(User loginUser, - long projectCode, - long processDefinitionCode, - long targetProcessDefinitionCode, - long taskCode); - /** * delete process task relation * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index a53bbe79f7..a864c61cd4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; - import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; @@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; @@ -47,7 +44,7 @@ import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.Date; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -58,8 +55,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; /** @@ -191,111 +186,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return processTaskRelationLog; } - /** - * move task to other processDefinition - * - * @param loginUser login user info - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param targetProcessDefinitionCode target process definition code - * @param taskCode the current task code (the post task code) - * @return move result code - */ - @Transactional(rollbackFor = RuntimeException.class) - @Override - public Map moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) { - Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode); - return result; - } - if (processDefinition.getProjectCode() != projectCode) { - putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); - return result; - } - List downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L); - if (CollectionUtils.isNotEmpty(downstreamList)) { - Set postTaskCodes = downstreamList - .stream() - .map(ProcessTaskRelation::getPostTaskCode) - .collect(Collectors.toSet()); - putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); - return result; - } - List upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode); - if (upstreamList.isEmpty()) { - putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode); - return result; - } else { - Set preTaskCodes = upstreamList - .stream() - .map(ProcessTaskRelation::getPreTaskCode) - .collect(Collectors.toSet()); - if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) { - putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ",")); - return result; - } - } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (null == taskDefinition) { - putMsg(result, Status.DATA_IS_NULL, "taskDefinition"); - return result; - } - ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams()); - if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) { - Set depProcessDefinitionCodes = new HashSet<>(); - ObjectNode dependence = (ObjectNode) paramNode.get("dependence"); - ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); - for (int i = 0; i < dependTaskList.size(); i++) { - ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); - ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); - for (int j = 0; j < dependItemList.size(); j++) { - ObjectNode dependItem = (ObjectNode) dependItemList.path(j); - long definitionCode = dependItem.get("definitionCode").asLong(); - depProcessDefinitionCodes.add(definitionCode); - } - } - if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) { - putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); - return result; - } - } - if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { - long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong(); - if (targetProcessDefinitionCode == subProcessDefinitionCode) { - putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); - return result; - } - } - updateProcessDefiniteVersion(loginUser, result, processDefinition); - Date now = new Date(); - ProcessTaskRelation processTaskRelation = upstreamList.get(0); - ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation); - processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); - processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelation.setUpdateTime(now); - processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); - processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - int update = processTaskRelationMapper.updateById(processTaskRelation); - int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog); - if (update == 0 || updateLog == 0) { - putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR); - } else { - putMsg(result, Status.SUCCESS); - } - return result; - } - private void updateProcessDefiniteVersion(User loginUser, Map result, ProcessDefinition processDefinition) { int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion <= 0) { @@ -337,28 +227,25 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } - List downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L); - if (CollectionUtils.isNotEmpty(downstreamList)) { - Set postTaskCodes = downstreamList - .stream() - .map(ProcessTaskRelation::getPostTaskCode) - .collect(Collectors.toSet()); - putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + if (CollectionUtils.isEmpty(processTaskRelationList)) { + putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; } - updateProcessDefiniteVersion(loginUser, result, processDefinition); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); - processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); - processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); - int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog); - int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - if (0 == deleteRelation || 0 == deleteRelationLog) { - putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); + List downstreamList = Lists.newArrayList(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + if (processTaskRelation.getPreTaskCode() == taskCode) { + downstreamList.add(processTaskRelation.getPostTaskCode()); + } + if (processTaskRelation.getPostTaskCode() == taskCode) { + processTaskRelationList.remove(processTaskRelation); + } + } + if (CollectionUtils.isNotEmpty(downstreamList)) { + putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ",")); + return result; } + updateRelation(loginUser, result, processDefinition, processTaskRelationList); if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType()) || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) || TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { @@ -372,6 +259,21 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } + private void updateRelation(User loginUser, Map result, ProcessDefinition processDefinition, + List processTaskRelationList) { + updateProcessDefiniteVersion(loginUser, result, processDefinition); + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + } + /** * delete task upstream relation * @@ -394,11 +296,42 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); return result; } - Status status = deleteUpstreamRelation(loginUser, projectCode, - Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode); - if (status != Status.SUCCESS) { - putMsg(result, status); + List upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (CollectionUtils.isEmpty(upstreamList)) { + putMsg(result, Status.DATA_IS_NULL, "taskCode"); + return result; + } + + List preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList()); + if (preTaskCodeList.contains(0L)) { + putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); + return result; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode()); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode()); + return result; } + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + List processTaskRelationWaitRemove = Lists.newArrayList(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + if (preTaskCodeList.size() > 1) { + if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) { + preTaskCodeList.remove(processTaskRelation.getPreTaskCode()); + processTaskRelationWaitRemove.add(processTaskRelation); + } + } else { + if (processTaskRelation.getPostTaskCode() == taskCode) { + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPreTaskCode(0L); + } + } + if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) { + processTaskRelationWaitRemove.add(processTaskRelation); + } + } + processTaskRelationList.removeAll(processTaskRelationWaitRemove); + updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -424,38 +357,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DATA_IS_NULL, "postTaskCodes"); return result; } - List processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode); - Map taskRelationLogMap = - processTaskRelationList.stream() - .map(ProcessTaskRelationLog::new) - .collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog)); - Set postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); - int delete = 0; - int deleteLog = 0; - Set processCodeSet = new HashSet<>(); - for (long postTaskCode : postTaskCodesSet) { - ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode); - if (processTaskRelationLog != null) { - delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode()); - } + List downstreamList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode); + if (CollectionUtils.isEmpty(downstreamList)) { + putMsg(result, Status.DATA_IS_NULL, "taskCode"); + return result; } - for (long code : processCodeSet) { - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { - throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); - } - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); - if (insertVersion <= 0) { - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } + List postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toList()); + if (postTaskCodeList.contains(0L)) { + putMsg(result, Status.DATA_IS_NULL, "postTaskCodes"); + return result; } - if ((delete & deleteLog) == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } else { - putMsg(result, Status.SUCCESS); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode()); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode()); + return result; } + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode); + updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -555,47 +474,44 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); return result; } - List processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode); + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); if (CollectionUtils.isEmpty(processTaskRelationList)) { putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; } - if (processTaskRelationList.size() > 1) { - putMsg(result, Status.DATA_IS_NOT_VALID, "processTaskRelationList"); - return result; + Map> taskRelationMap = new HashMap<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + taskRelationMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(processTaskRelation); + return v; + }); } - ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0); - int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(), - 0L, processTaskRelation.getPostTaskCode()); - - if (upstreamCount == 0) { - putMsg(result, Status.DATA_IS_NULL, "upstreamCount"); + if (!taskRelationMap.containsKey(postTaskCode)) { + putMsg(result, Status.DATA_IS_NULL, "postTaskCode"); return result; } - if (upstreamCount > 1) { - int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId()); - if (delete == 0) { - putMsg(result, Status.DELETE_EDGE_ERROR); + if (taskRelationMap.get(postTaskCode).size() > 1) { + for (ProcessTaskRelation processTaskRelation : taskRelationMap.get(postTaskCode)) { + if (processTaskRelation.getPreTaskCode() == preTaskCode) { + int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId()); + if (delete == 0) { + putMsg(result, Status.DELETE_EDGE_ERROR); + throw new ServiceException(Status.DELETE_EDGE_ERROR); + } + processTaskRelationList.remove(processTaskRelation); + } } - return result; - } - updateProcessDefiniteVersion(loginUser, result, processDefinition); - processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelation.setPreTaskVersion(0); - processTaskRelation.setPreTaskCode(0L); - Date now = new Date(); - processTaskRelation.setUpdateTime(now); - int update = processTaskRelationMapper.updateById(processTaskRelation); - processTaskRelation.setId(0); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - processTaskRelationLog.setCreateTime(now); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - int insert = processTaskRelationLogMapper.insert(processTaskRelationLog); - if ((update & insert) == 0) { - putMsg(result, Status.DELETE_EDGE_ERROR); - throw new ServiceException(Status.DELETE_EDGE_ERROR); + } else { + ProcessTaskRelation processTaskRelation = taskRelationMap.get(postTaskCode).get(0); + processTaskRelationList.remove(processTaskRelation); + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPreTaskCode(0L); + processTaskRelationList.add(processTaskRelation); } + updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -627,80 +543,4 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } }; } - - /** - * delete upstream relation - * - * @param projectCode project code - * @param preTaskCodes pre task codes - * @param taskCode pre task code - * @return status - */ - private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) { - List upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes); - if (CollectionUtils.isEmpty(upstreamList)) { - return Status.SUCCESS; - } - List upstreamLogList = new ArrayList<>(); - Date now = new Date(); - for (ProcessTaskRelation processTaskRelation : upstreamList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setUpdateTime(now); - upstreamLogList.add(processTaskRelationLog); - } - Map> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream() - .collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode)); - // count upstream relation group by process definition code - List> countListGroupByProcessDefinitionCode = processTaskRelationMapper - .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode); - - List deletes = new ArrayList<>(); - List updates = new ArrayList<>(); - for (Map codeCountMap : countListGroupByProcessDefinitionCode) { - long processDefinitionCode = codeCountMap.get("processDefinitionCode"); - long countValue = codeCountMap.get("countValue"); - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); - if (processDefinition == null) { - return Status.PROCESS_DEFINE_NOT_EXIST; - } - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); - if (insertVersion <= 0) { - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - List processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); - if (countValue <= processTaskRelationLogList.size()) { - ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0); - if (processTaskRelationLog.getPreTaskCode() != 0) { - processTaskRelationLog.setPreTaskCode(0); - processTaskRelationLog.setPreTaskVersion(0); - } - processTaskRelationLog.setProcessDefinitionVersion(insertVersion); - updates.add(processTaskRelationLog); - } - if (!processTaskRelationLogList.isEmpty()) { - deletes.addAll(processTaskRelationLogList); - } - } - deletes.addAll(updates); - int delete = 0; - int deleteLog = 0; - for (ProcessTaskRelationLog processTaskRelationLog : deletes) { - delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - } - if ((delete & deleteLog) == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } else { - if (!updates.isEmpty()) { - int insert = processTaskRelationMapper.batchInsert(updates); - int insertLog = processTaskRelationLogMapper.batchInsert(updates); - if ((insert & insertLog) == 0) { - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } - } - } - return Status.SUCCESS; - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 749ebac8a4..2ed0f61a08 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -201,7 +201,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } - if (taskDefinition.getFlag() == Flag.YES) { + if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); return result; } @@ -577,7 +577,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } taskDefinition.setFlag(Flag.YES); - taskDefinitionLog.setFlag(Flag.NO); + taskDefinitionLog.setFlag(Flag.YES); break; default: putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 5db786853e..f21de2aed4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -289,48 +289,6 @@ public class ProcessTaskRelationServiceTest { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } - @Test - public void testMoveTaskProcessRelation() { - long projectCode = 1L; - long processDefinitionCode = 1L; - long taskCode = 1L; - - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); - Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList()); - Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); - List processTaskRelationList = Lists.newArrayList(); - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); - processTaskRelation.setProjectCode(projectCode); - processTaskRelation.setProcessDefinitionCode(processDefinitionCode); - processTaskRelation.setPreTaskCode(0L); - processTaskRelation.setPreTaskVersion(0); - processTaskRelation.setPostTaskCode(taskCode); - processTaskRelation.setPostTaskVersion(1); - processTaskRelationList.add(processTaskRelation); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setPostTaskVersion(1); - Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList); - Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog); - Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - } - @Test public void testQueryDownstreamRelation() { long projectCode = 1L; @@ -479,38 +437,23 @@ public class ProcessTaskRelationServiceTest { loginUser.setUserType(UserType.GENERAL_USER); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); + List processTaskRelationList = Lists.newArrayList(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(1L); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPostTaskCode(taskCode); + processTaskRelation.setPostTaskVersion(1); + processTaskRelationList.add(processTaskRelation); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList()); - List> countListGroupByProcessDefinitionCode = new ArrayList<>(); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put("processDefinitionCode", 123L); - put("countValue", 2L); - } - }); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put("processDefinitionCode", 124L); - put("countValue", 1L); - } - }); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put("processDefinitionCode", 125L); - put("countValue", 3L); - } - }); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setPostTaskVersion(2); - Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode); - Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); - Map result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode); - Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); + Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition()); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @Test @@ -530,26 +473,24 @@ public class ProcessTaskRelationServiceTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(Lists.newArrayList()); - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setPreTaskCode(taskCode); - processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); - Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc()); Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition); - Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1); - processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setPostTaskCode(taskCode); - processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); - processTaskRelationLog.setProcessDefinitionVersion(1); - Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); - Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1); - result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); + List processTaskRelationList = Lists.newArrayList(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(1L); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); + processTaskRelation.setPostTaskCode(taskCode); + processTaskRelation.setPostTaskVersion(1); + processTaskRelationList.add(processTaskRelation); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)).thenReturn(processTaskRelationList); + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -578,15 +519,11 @@ public class ProcessTaskRelationServiceTest { processTaskRelationLog.setOperator(loginUser.getId()); List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelation); - Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList); - Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1); - Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1); - Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1); - ProcessDefinition processDefinition = getProcessDefinition(); - Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); - Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1); - result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + Mockito.when(processService.saveTaskRelation(loginUser, 1L, 1L, + 1, relationLogs, Lists.newArrayList(), Boolean.TRUE)).thenReturn(0); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } }