From b265297fc86108501ce8ca306a38fcfed1eb2fe5 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Thu, 2 Dec 2021 14:06:39 +0800 Subject: [PATCH] [Improvement][API] fix 6772 new api bug (#7098) * add processDefinition releaseWorkflowAndSchedule * add ProcessTaskRelationServiceImpl.moveTaskProcessRelation * add dependency and subprocess judgement * fix api bug * fix codestyle --- .../ProcessTaskRelationController.java | 2 +- .../impl/ProcessDefinitionServiceImpl.java | 120 ++++----- .../impl/ProcessTaskRelationServiceImpl.java | 249 ++++++++++-------- .../impl/TaskDefinitionServiceImpl.java | 37 ++- .../service/ProcessDefinitionServiceTest.java | 7 + .../ProcessTaskRelationServiceTest.java | 91 ++++--- .../TaskDefinitionServiceImplTest.java | 27 +- .../mapper/ProcessTaskRelationLogMapper.java | 9 + .../dao/mapper/ProcessTaskRelationMapper.java | 16 +- .../mapper/ProcessTaskRelationLogMapper.xml | 16 +- .../dao/mapper/ProcessTaskRelationMapper.xml | 28 +- .../service/process/ProcessService.java | 3 + 12 files changed, 346 insertions(+), 259 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java index cd9f5633dd..de911e1e42 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java @@ -142,7 +142,7 @@ public class ProcessTaskRelationController extends BaseController { } /** - * delete process task relation + * delete process task relation (delete task from workflow) * * @param loginUser login user * @param projectCode project code diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index abe070629a..0cc1f3b082 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -233,7 +233,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, locations, timeout, loginUser.getId(), tenantId); + globalParams, locations, timeout, loginUser.getId(), tenantId); processDefinition.setExecutionType(executionType); return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs); @@ -300,8 +300,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } List processTaskRelations = taskRelationList.stream() - .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) - .collect(Collectors.toList()); + .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) + .collect(Collectors.toList()); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); if (taskNodeList.size() != taskRelationList.size()) { Set postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet()); @@ -412,7 +412,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Page page = new Page<>(pageNo, pageSize); IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( - page, searchVal, userId, project.getCode(), isAdmin(loginUser)); + page, searchVal, userId, project.getCode(), isAdmin(loginUser)); List records = processDefinitionIPage.getRecords(); for (ProcessDefinition pd : records) { @@ -591,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -685,11 +685,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } int delete = processDefinitionMapper.deleteById(processDefinition.getId()); - int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); - if (delete == 0 || deleteRelation == 0) { + if (delete == 0) { putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } + int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); + if (deleteRelation == 0) { + logger.warn("The process definition has not relation, it will be delete successfully"); + } putMsg(result, Status.SUCCESS); return result; } @@ -1156,7 +1159,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); List taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); Map taskDefinitionMap = taskDefinitionList.stream() - .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); if (limit > processInstanceList.size()) { limit = processInstanceList.size(); @@ -1171,8 +1174,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessInstance processInstance = processInstanceList.get(i); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(), - "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), - DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), + DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); } List parentTreeViewDtoList = new ArrayList<>(); @@ -1363,7 +1366,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro diffCode.forEach(code -> failedProcessList.add(code + "[null]")); for (ProcessDefinition processDefinition : processDefinitionList) { List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); if (isCopy) { @@ -1532,13 +1535,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * create empty process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param description description * @param globalParams globalParams - * @param timeout timeout - * @param tenantCode tenantCode + * @param timeout timeout + * @param tenantCode tenantCode * @param scheduleJson scheduleJson * @return process definition code */ @@ -1584,7 +1587,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, "", timeout, loginUser.getId(), tenantId); + globalParams, "", timeout, loginUser.getId(), tenantId); processDefinition.setExecutionType(executionType); result = createEmptyDagDefine(loginUser, processDefinition); if (result.get(Constants.STATUS) != Status.SUCCESS) { @@ -1596,7 +1599,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } // save dag schedule - Map scheduleResult = createDagSchedule(loginUser, project, processDefinition, scheduleJson); + Map scheduleResult = createDagSchedule(loginUser, processDefinition, scheduleJson); if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) { Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS); putMsg(result, scheduleResultStatus); @@ -1617,10 +1620,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - private Map createDagSchedule(User loginUser, - Project project, - ProcessDefinition processDefinition, - String scheduleJson) { + private Map createDagSchedule(User loginUser, ProcessDefinition processDefinition, String scheduleJson) { Map result = new HashMap<>(); Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class); if (scheduleObj == null) { @@ -1651,12 +1651,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode()); scheduleMapper.insert(scheduleObj); - /** - * updateProcessInstance receivers and cc by process definition id - */ - processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId()); - processDefinitionMapper.updateById(processDefinition); - putMsg(result, Status.SUCCESS); result.put("scheduleId", scheduleObj.getId()); return result; @@ -1665,15 +1659,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * update process definition basic info * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param tenantCode tenantCode - * @param scheduleJson scheduleJson + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param code process definition code + * @param description description + * @param globalParams globalParams + * @param timeout timeout + * @param tenantCode tenantCode + * @param scheduleJson scheduleJson * @param executionType executionType * @return update result code */ @@ -1728,7 +1722,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); processDefinition.set(projectCode, name, description, globalParams, "", timeout, tenantId); processDefinition.setExecutionType(executionType); - result = updateDagDefineBasicInfo(loginUser, processDefinition, processDefinitionDeepCopy); + List taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, Lists.newArrayList()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -1746,26 +1741,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - private Map updateDagDefineBasicInfo(User loginUser, - ProcessDefinition processDefinition, - ProcessDefinition processDefinitionDeepCopy) { - Map result = new HashMap<>(); - int insertVersion; - if (processDefinition.equals(processDefinitionDeepCopy)) { - insertVersion = processDefinitionDeepCopy.getVersion(); - } else { - processDefinition.setUpdateTime(new Date()); - insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); - } - if (insertVersion == 0) { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - return result; - } - private Map updateDagSchedule(User loginUser, long projectCode, long processDefinitionCode, @@ -1791,24 +1766,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro param.setTimezoneId(schedule.getTimezoneId()); return schedulerService.updateScheduleByProcessDefinitionCode( - loginUser, - projectCode, - processDefinitionCode, - JSONUtils.toJsonString(param), - warningType, - warningGroupId, - failureStrategy, - processInstancePriority, - workerGroup, - environmentCode); + loginUser, + projectCode, + processDefinitionCode, + JSONUtils.toJsonString(param), + warningType, + warningGroupId, + failureStrategy, + processInstancePriority, + workerGroup, + environmentCode); } /** * release process definition and schedule * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState releaseState * @return update result code */ @@ -1846,6 +1821,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } processDefinition.setReleaseState(releaseState); processDefinitionMapper.updateById(processDefinition); + scheduleObj.setReleaseState(ReleaseState.ONLINE); scheduleMapper.updateById(scheduleObj); break; case OFFLINE: diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index d6c606c27a..b49a08915b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -90,11 +91,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P /** * create process task relation * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processDefinitionCode processDefinitionCode - * @param preTaskCode preTaskCode - * @param postTaskCode postTaskCode + * @param preTaskCode preTaskCode + * @param postTaskCode postTaskCode * @return create result code */ @Transactional(rollbackFor = RuntimeException.class) @@ -162,6 +163,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); if ((insert & insertLog) > 0) { putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } return result; } @@ -173,6 +177,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); processTaskRelationLog.setPostTaskCode(taskDefinition.getCode()); processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); processTaskRelationLog.setCreateTime(now); processTaskRelationLog.setUpdateTime(now); processTaskRelationLog.setOperator(userId); @@ -183,13 +189,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P /** * move task to other processDefinition * - * @param loginUser login user info - * @param projectCode project code - * @param processDefinitionCode process definition code + * @param loginUser login user info + * @param projectCode project code + * @param processDefinitionCode process definition code * @param targetProcessDefinitionCode target process definition code - * @param taskCode the current task code (the post task code) + * @param taskCode the current task code (the post task code) * @return move result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -263,12 +270,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } Date now = new Date(); ProcessTaskRelation processTaskRelation = upstreamList.get(0); + ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation); processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); processTaskRelation.setUpdateTime(now); + processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); int update = processTaskRelationMapper.updateById(processTaskRelation); - if (update == 0) { + int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog); + if (update == 0 || updateLog == 0) { putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR); + } else { + putMsg(result, Status.SUCCESS); } return result; } @@ -282,6 +299,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param taskCode the post task code * @return delete result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -294,38 +312,48 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); return result; } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); + return result; + } + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (null == taskDefinition) { + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); + return result; + } List downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L); if (CollectionUtils.isNotEmpty(downstreamList)) { Set postTaskCodes = downstreamList - .stream() - .map(ProcessTaskRelation::getPostTaskCode) - .collect(Collectors.toSet()); + .stream() + .map(ProcessTaskRelation::getPostTaskCode) + .collect(Collectors.toSet()); putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); return result; } ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); processTaskRelationLog.setProjectCode(projectCode); - processTaskRelationLog.setPreTaskCode(taskCode); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog); - if (0 == deleteRelation) { + int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + if (0 == deleteRelation || 0 == deleteRelationLog) { putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } - - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (null == taskDefinition) { - putMsg(result, Status.DATA_IS_NULL, "taskDefinition"); - return result; + throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); } if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType()) - || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) - || TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { + || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) + || TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { int deleteTaskDefinition = taskDefinitionMapper.deleteByCode(taskCode); if (0 == deleteTaskDefinition) { - putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); + putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); + throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); } } + putMsg(result, Status.SUCCESS); return result; } @@ -338,6 +366,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param taskCode the post task code * @return delete result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -347,11 +376,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } if (StringUtils.isEmpty(preTaskCodes)) { - putMsg(result,Status.DATA_IS_NULL,"preTaskCodes"); + putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); return result; } - Set preTaskCodesSet = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); - Status status = deleteUpstreamRelation(projectCode, preTaskCodesSet.toArray(new Long[0]), taskCode); + Status status = deleteUpstreamRelation(loginUser.getId(), projectCode, + Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode); if (status != Status.SUCCESS) { putMsg(result, status); } @@ -367,6 +396,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param taskCode the pre task code * @return delete result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteDownstreamRelation(User loginUser, long projectCode, String postTaskCodes, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -376,26 +406,28 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } if (StringUtils.isEmpty(postTaskCodes)) { - putMsg(result,Status.DATA_IS_NULL,"postTaskCodes"); + putMsg(result, Status.DATA_IS_NULL, "postTaskCodes"); return result; } + List processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode); + Map taskRelationLogMap = + processTaskRelationList.stream() + .map(ProcessTaskRelationLog::new) + .collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog)); Set postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); - List deleteFailedCodeList = new ArrayList<>(); - postTaskCodesSet.stream().forEach( - postTaskCode -> { - try { - Status status = deleteUpstreamRelation(projectCode, new Long[]{taskCode}, postTaskCode); - if (Status.SUCCESS != status) { - deleteFailedCodeList.add(postTaskCode); - } - } catch (Exception e) { - deleteFailedCodeList.add(postTaskCode); - } - - } - ); - if (!deleteFailedCodeList.isEmpty()) { - putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR, String.join(",", deleteFailedCodeList.stream().map(o -> o + "").collect(Collectors.toList()))); + int delete = 0; + int deleteLog = 0; + for (long postTaskCode : postTaskCodesSet) { + ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode); + if (processTaskRelationLog != null) { + delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); + deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + } + } + if ((delete & deleteLog) == 0) { + throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); + } else { + putMsg(result, Status.SUCCESS); } return result; } @@ -420,15 +452,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P List taskDefinitionLogList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(processTaskRelationList)) { Set taskDefinitions = processTaskRelationList - .stream() - .map(processTaskRelation -> { - TaskDefinition taskDefinition = buildTaskDefinition(); - taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); - taskDefinition.setCode(processTaskRelation.getPreTaskCode()); - taskDefinition.setVersion(processTaskRelation.getPreTaskVersion()); - return taskDefinition; - }) - .collect(Collectors.toSet()); + .stream() + .map(processTaskRelation -> { + TaskDefinition taskDefinition = buildTaskDefinition(); + taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); + taskDefinition.setCode(processTaskRelation.getPreTaskCode()); + taskDefinition.setVersion(processTaskRelation.getPreTaskVersion()); + return taskDefinition; + }) + .collect(Collectors.toSet()); taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions); } result.put(Constants.DATA_LIST, taskDefinitionLogList); @@ -456,15 +488,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P List taskDefinitionLogList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(processTaskRelationList)) { Set taskDefinitions = processTaskRelationList - .stream() - .map(processTaskRelation -> { - TaskDefinition taskDefinition = buildTaskDefinition(); - taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); - taskDefinition.setCode(processTaskRelation.getPostTaskCode()); - taskDefinition.setVersion(processTaskRelation.getPostTaskVersion()); - return taskDefinition; - }) - .collect(Collectors.toSet()); + .stream() + .map(processTaskRelation -> { + TaskDefinition taskDefinition = buildTaskDefinition(); + taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); + taskDefinition.setCode(processTaskRelation.getPostTaskCode()); + taskDefinition.setVersion(processTaskRelation.getPostTaskVersion()); + return taskDefinition; + }) + .collect(Collectors.toSet()); taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions); } result.put(Constants.DATA_LIST, taskDefinitionLogList); @@ -478,8 +510,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param loginUser login user * @param projectCode project code * @param processDefinitionCode process definition code - * @param preTaskCode pre task code - * @param postTaskCode post task code + * @param preTaskCode pre task code + * @param postTaskCode post task code * @return delete result code */ @Override @@ -501,7 +533,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0); int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(), - 0L, processTaskRelation.getPostTaskCode()); + 0L, processTaskRelation.getPostTaskCode()); if (upstreamCount == 0) { putMsg(result, Status.DATA_IS_NULL, "upstreamCount"); @@ -541,8 +573,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } TaskDefinition that = (TaskDefinition) o; return getCode() == that.getCode() - && getVersion() == that.getVersion() - && getProjectCode() == that.getProjectCode(); + && getVersion() == that.getVersion() + && getProjectCode() == that.getProjectCode(); } @Override @@ -560,52 +592,61 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param taskCode pre task code * @return status */ - private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) { + private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) { List upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes); if (CollectionUtils.isEmpty(upstreamList)) { return Status.SUCCESS; } - Map> processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream() - .collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode)); + List upstreamLogList = new ArrayList<>(); + Date now = new Date(); + for (ProcessTaskRelation processTaskRelation : upstreamList) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(userId); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setUpdateTime(now); + upstreamLogList.add(processTaskRelationLog); + } + Map> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream() + .collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode)); // count upstream relation group by process definition code - List> countListGroupByProcessDefinitionCode = processTaskRelationMapper - .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode); - - List deletes = new ArrayList<>(); - List updates = new ArrayList<>(); - - countListGroupByProcessDefinitionCode.stream().forEach( - processDefinitionCodeUpstreamCountMap -> - processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach( - o -> { - Long processDefinitionCode = o.getKey(); - Integer count = o.getValue(); - List processTaskRelationList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); - if (count <= processTaskRelationList.size()) { - ProcessTaskRelation processTaskRelation = processTaskRelationList.remove(0); - if (processTaskRelation.getPreTaskCode() != 0) { - processTaskRelation.setPreTaskCode(0); - processTaskRelation.setPreTaskVersion(0); - updates.add(processTaskRelation); - } - } - if (!processTaskRelationList.isEmpty()) { - deletes.addAll(processTaskRelationList); - } - } - ) - ); - - int update = 0; - if (!updates.isEmpty()) { - update = processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates); + List> countListGroupByProcessDefinitionCode = processTaskRelationMapper + .countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode); + + List deletes = new ArrayList<>(); + List updates = new ArrayList<>(); + for (Map codeCountMap : countListGroupByProcessDefinitionCode) { + long processDefinitionCode = codeCountMap.get("processDefinitionCode"); + long countValue = codeCountMap.get("countValue"); + List processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); + if (countValue <= processTaskRelationLogList.size()) { + ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0); + if (processTaskRelationLog.getPreTaskCode() != 0) { + processTaskRelationLog.setPreTaskCode(0); + processTaskRelationLog.setPreTaskVersion(0); + updates.add(processTaskRelationLog); + } + } + if (!processTaskRelationLogList.isEmpty()) { + deletes.addAll(processTaskRelationLogList); + } } + deletes.addAll(updates); int delete = 0; - if (!deletes.isEmpty()) { - delete = processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList())); + int deleteLog = 0; + for (ProcessTaskRelationLog processTaskRelationLog : deletes) { + delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); + deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); } - if (update < 0 || delete < 0) { - return Status.DELETE_TASK_PROCESS_RELATION_ERROR; + if ((delete & deleteLog) == 0) { + throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); + } else { + if (!updates.isEmpty()) { + int insert = processTaskRelationMapper.batchInsert(updates); + int insertLog = processTaskRelationLogMapper.batchInsert(updates); + if ((insert & insertLog) == 0) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + } } return Status.SUCCESS; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 5655f33cb2..ea0152526e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -89,6 +91,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + @Autowired private ProcessService processService; @@ -255,7 +260,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinitionToUpdate.setId(taskDefinition.getId()); taskDefinitionToUpdate.setProjectCode(projectCode); taskDefinitionToUpdate.setUserId(taskDefinition.getUserId()); - taskDefinitionToUpdate.setVersion(version + 1); + taskDefinitionToUpdate.setVersion(++version); taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase()); taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate)); taskDefinitionToUpdate.setUpdateTime(now); @@ -268,6 +273,36 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } + List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); + if (!processTaskRelationList.isEmpty()) { + List processTaskRelationLogList = new ArrayList<>(); + int delete = 0; + int deleteLog = 0; + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); + deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + if (processTaskRelationLog.getPreTaskCode() == taskCode) { + processTaskRelationLog.setPreTaskVersion(version); + } + if (processTaskRelationLog.getPostTaskCode() == taskCode) { + processTaskRelationLog.setPostTaskVersion(version); + } + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLogList.add(processTaskRelationLog); + } + if ((delete & deleteLog) == 0) { + throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); + } else { + int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); + int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList); + if ((insertRelation & insertRelationLog) == 0) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + } + } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS, update); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 22413c55c2..a7358e45c5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -429,6 +429,13 @@ public class ProcessDefinitionServiceTest { // project check auth success, processs definition online putMsg(result, Status.SUCCESS, projectCode); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition()); + List processTaskRelationList = new ArrayList<>(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(46L); + processTaskRelation.setPostTaskCode(123L); + processTaskRelationList.add(processTaskRelation); + Mockito.when(processService.findRelationByCode(projectCode, 46L)).thenReturn(processTaskRelationList); Map onlineRes = processDefinitionService.releaseProcessDefinition( loginUser, projectCode, 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 0de0c917c8..37a4bb16d7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -192,13 +192,13 @@ public class ProcessTaskRelationServiceTest { processTaskRelationUpstream1.setPostTaskCode(taskCode); processTaskRelationUpstream1.setPreTaskVersion(1); processTaskRelationUpstream1.setPreTaskCode(123); - processTaskRelationUpstream0.setProcessDefinitionCode(124); + processTaskRelationUpstream1.setProcessDefinitionCode(124); processTaskRelationUpstream1.setProjectCode(projectCode); ProcessTaskRelation processTaskRelationUpstream2 = new ProcessTaskRelation(); processTaskRelationUpstream2.setPostTaskCode(taskCode); processTaskRelationUpstream2.setPreTaskVersion(2); - processTaskRelationUpstream1.setPreTaskCode(123); - processTaskRelationUpstream0.setProcessDefinitionCode(125); + processTaskRelationUpstream2.setPreTaskCode(123); + processTaskRelationUpstream2.setProcessDefinitionCode(125); processTaskRelationUpstream2.setProjectCode(projectCode); List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelationUpstream0); @@ -238,6 +238,7 @@ public class ProcessTaskRelationServiceTest { processDefinition.setTenantId(1); processDefinition.setDescription(""); processDefinition.setCode(1L); + processDefinition.setVersion(1); return processDefinition; } @@ -312,8 +313,17 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPostTaskCode(taskCode); processTaskRelation.setPostTaskVersion(1); processTaskRelationList.add(processTaskRelation); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(1); Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog); Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -435,27 +445,17 @@ public class ProcessTaskRelationServiceTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode); - Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList); - List> countListGroupByProcessDefinitionCode = new ArrayList<>(); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put(123L, 2); - } - }); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put(124L, 1); - } - }); - countListGroupByProcessDefinitionCode.add(new HashMap() { - { - put(125L, 3); - } - }); - Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode); - Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3); - Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3); + List processTaskRelationList = new ArrayList<>(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(projectCode); + processTaskRelation.setProcessDefinitionCode(1L); + processTaskRelation.setPreTaskCode(taskCode); + processTaskRelation.setPostTaskCode(123L); + processTaskRelationList.add(processTaskRelation); + Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Map result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode); Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); } @@ -473,27 +473,35 @@ public class ProcessTaskRelationServiceTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - List processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode); - Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList); - List> countListGroupByProcessDefinitionCode = new ArrayList<>(); - countListGroupByProcessDefinitionCode.add(new HashMap() { + Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList()); + List> countListGroupByProcessDefinitionCode = new ArrayList<>(); + countListGroupByProcessDefinitionCode.add(new HashMap() { { - put(123L, 2); + put("processDefinitionCode", 123L); + put("countValue", 2L); } }); - countListGroupByProcessDefinitionCode.add(new HashMap() { + countListGroupByProcessDefinitionCode.add(new HashMap() { { - put(124L, 1); + put("processDefinitionCode", 124L); + put("countValue", 1L); } }); - countListGroupByProcessDefinitionCode.add(new HashMap() { + countListGroupByProcessDefinitionCode.add(new HashMap() { { - put(125L, 3); + put("processDefinitionCode", 125L); + put("countValue", 3L); } }); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(2); Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode); - Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3); - Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3); + Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Map result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode); Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); } @@ -501,8 +509,8 @@ public class ProcessTaskRelationServiceTest { @Test public void testDeleteTaskProcessRelation() { long projectCode = 1L; - long taskCode = 2L; - long processDefinitionCode = 3L; + long taskCode = 1L; + long processDefinitionCode = 1L; long preTaskCode = 4L; long postTaskCode = 5L; Project project = getProject(projectCode); @@ -520,10 +528,19 @@ public class ProcessTaskRelationServiceTest { processTaskRelationLog.setPreTaskCode(taskCode); processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition()); + Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc()); Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1); + processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setProcessDefinitionVersion(1); + Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 6defcc5fa8..94f2ad5889 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -42,7 +43,6 @@ import java.util.List; import java.util.Map; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -50,7 +50,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -@Ignore @RunWith(MockitoJUnitRunner.class) public class TaskDefinitionServiceImplTest { @@ -74,7 +73,6 @@ public class TaskDefinitionServiceImplTest { @Mock private ProcessTaskRelationMapper processTaskRelationMapper; - ; @Test public void createTaskDefinition() { @@ -166,6 +164,7 @@ public class TaskDefinitionServiceImplTest { @Test public void deleteTaskDefinitionByCode() { long projectCode = 1L; + long taskCode = 1L; Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); @@ -177,13 +176,14 @@ public class TaskDefinitionServiceImplTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(processTaskRelationMapper.queryByTaskCode(Mockito.anyLong())) + Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition()); + Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(taskCode)) .thenReturn(new ArrayList<>()); - Mockito.when(taskDefinitionMapper.deleteByCode(Mockito.anyLong())) + Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)) .thenReturn(1); Map relation = taskDefinitionService - .deleteTaskDefinitionByCode(loginUser, projectCode, Mockito.anyLong()); + .deleteTaskDefinitionByCode(loginUser, projectCode, taskCode); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } @@ -241,6 +241,15 @@ public class TaskDefinitionServiceImplTest { return project; } + private TaskDefinition getTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(1L); + taskDefinition.setCode(1L); + taskDefinition.setVersion(1); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); + return taskDefinition; + } + @Test public void checkJson() { String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" @@ -305,12 +314,8 @@ public class TaskDefinitionServiceImplTest { Map onlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS)); - // process definition online, resource does not exist - Map onlineResResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE); - Assert.assertEquals(Status.SUCCESS, onlineResResult.get(Constants.STATUS)); - // release error code Map failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); } -} +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java index b086377365..e4ffa49955 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.ibatis.annotations.Param; @@ -65,4 +66,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); - /** - * query task relation by codes - * - * @param projectCode projectCode - * @param taskCode taskCode - * @param postTaskCodes postTaskCodes list - * @return ProcessTaskRelation - */ - List queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes); - /** * query task relation by codes * @@ -130,8 +120,9 @@ public interface ProcessTaskRelationMapper extends BaseMapper> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, - @Param("processDefinitionCodes") Long[] processDefinitionCodes, @Param("taskCode") long taskCode); + List> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, + @Param("processDefinitionCodes") Long[] processDefinitionCodes, + @Param("taskCode") long taskCode); /** * batch update process task relation pre task @@ -176,4 +167,5 @@ public interface ProcessTaskRelationMapper extends BaseMapper and pre_task_code = #{processTaskRelationLog.preTaskCode} and pre_task_version = #{processTaskRelationLog.preTaskVersion} + and post_task_code = #{processTaskRelationLog.postTaskCode} - and post_task_version = #{processTaskRelationLog.post_task_version} + and post_task_version = #{processTaskRelationLog.postTaskVersion} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 66633d381e..4598ec0ead 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -92,20 +92,6 @@ and post_task_code = #{taskCode} - - - + select process_definition_code as processDefinitionCode, + count(0) as countValue from t_ds_process_task_relation WHERE project_code = #{projectCode} and post_task_code = #{taskCode} @@ -172,10 +158,12 @@ WHERE project_code = #{processTaskRelationLog.projectCode} and process_definition_code = #{processTaskRelationLog.processDefinitionCode} and process_definition_version = #{processTaskRelationLog.processDefinitionVersion} - and pre_task_code = #{processTaskRelationLog.preTaskCode} - and pre_task_version = #{processTaskRelationLog.preTaskVersion} + + and pre_task_code = #{processTaskRelationLog.preTaskCode} + and pre_task_version = #{processTaskRelationLog.preTaskVersion} + and post_task_code = #{processTaskRelationLog.postTaskCode} - and post_task_version = #{processTaskRelationLog.post_task_version} + and post_task_version = #{processTaskRelationLog.postTaskVersion}