From 6bf8063fe8b8491d33412452dfd58f81ab88c7c8 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 19 Jan 2022 17:14:41 +0800 Subject: [PATCH] [Bug] [dolphinscheduler] fix duplicate key (#8111) * fix #8043 * fix null when no change Co-authored-by: caishunfeng <534328519@qq.com> --- .../impl/ProcessDefinitionServiceImpl.java | 52 +++++++++------ .../impl/ProcessTaskRelationServiceImpl.java | 65 +++++++++++++++++-- .../impl/TaskDefinitionServiceImpl.java | 47 ++++++++++---- .../service/ProcessDefinitionServiceTest.java | 1 + .../ProcessTaskRelationServiceTest.java | 17 ++++- .../service/process/ProcessService.java | 33 +++++++++- 6 files changed, 176 insertions(+), 39 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index dc768499a2..cfbeb25cc0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -577,29 +577,43 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } - int insertVersion; - if (processDefinition.equals(processDefinitionDeepCopy)) { - insertVersion = processDefinitionDeepCopy.getVersion(); - ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinition.getCode(), insertVersion); - processDefinitionLog.setOperator(loginUser.getId()); - processDefinitionLog.setOperateTime(new Date()); - int update = processDefinitionLogMapper.updateById(processDefinitionLog); + boolean isChange = false; + if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) { + List processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + if (taskRelationList.size() == processTaskRelationLogList.size()) { + Map taskRelationLogMap = + taskRelationList.stream().collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog)); + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { + if (!processTaskRelationLog.equals(taskRelationLogMap.get(processTaskRelationLog.getPostTaskCode()))) { + isChange = true; + break; + } + } + } else { + isChange = true; + } } else { - processDefinition.setUpdateTime(new Date()); - insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); - } - if (insertVersion == 0) { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + isChange = true; } - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { + if (isChange) { + processDefinition.setUpdateTime(new Date()); + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion <= 0) { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + } else { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index b49a08915b..a53bbe79f7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P @Autowired private ProcessDefinitionMapper processDefinitionMapper; + @Autowired + private ProcessService processService; + /** * create process task relation * @@ -135,6 +139,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } } } + updateProcessDefiniteVersion(loginUser, result, processDefinition); Date now = new Date(); List processTaskRelationLogs = new ArrayList<>(); if (preTaskCode != 0L) { @@ -268,6 +273,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } } + updateProcessDefiniteVersion(loginUser, result, processDefinition); Date now = new Date(); ProcessTaskRelation processTaskRelation = upstreamList.get(0); ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation); @@ -290,6 +296,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } + private void updateProcessDefiniteVersion(User loginUser, Map result, ProcessDefinition processDefinition) { + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion <= 0) { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + processDefinition.setVersion(insertVersion); + } + /** * delete process task relation * @@ -331,7 +346,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); return result; } - + updateProcessDefiniteVersion(loginUser, result, processDefinition); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); processTaskRelationLog.setProjectCode(projectCode); processTaskRelationLog.setPostTaskCode(taskCode); @@ -379,7 +394,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); return result; } - Status status = deleteUpstreamRelation(loginUser.getId(), projectCode, + Status status = deleteUpstreamRelation(loginUser, projectCode, Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode); if (status != Status.SUCCESS) { putMsg(result, status); @@ -417,11 +432,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P Set postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); int delete = 0; int deleteLog = 0; + Set processCodeSet = new HashSet<>(); for (long postTaskCode : postTaskCodesSet) { ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode); if (processTaskRelationLog != null) { delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode()); + } + } + for (long code : processCodeSet) { + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); + if (processDefinition == null) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); + } + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion <= 0) { + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } } if ((delete & deleteLog) == 0) { @@ -514,6 +541,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param postTaskCode post task code * @return delete result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -522,6 +550,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); + return result; + } List processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode); if (CollectionUtils.isEmpty(processTaskRelationList)) { putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); @@ -546,11 +579,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } return result; } + updateProcessDefiniteVersion(loginUser, result, processDefinition); + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); processTaskRelation.setPreTaskVersion(0); processTaskRelation.setPreTaskCode(0L); + Date now = new Date(); + processTaskRelation.setUpdateTime(now); int update = processTaskRelationMapper.updateById(processTaskRelation); - if (update == 0) { + processTaskRelation.setId(0); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + int insert = processTaskRelationLogMapper.insert(processTaskRelationLog); + if ((update & insert) == 0) { putMsg(result, Status.DELETE_EDGE_ERROR); + throw new ServiceException(Status.DELETE_EDGE_ERROR); } return result; } @@ -592,7 +636,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * @param taskCode pre task code * @return status */ - private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) { + private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) { List upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes); if (CollectionUtils.isEmpty(upstreamList)) { return Status.SUCCESS; @@ -601,7 +645,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P Date now = new Date(); for (ProcessTaskRelation processTaskRelation : upstreamList) { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - processTaskRelationLog.setOperator(userId); + processTaskRelationLog.setOperator(loginUser.getId()); processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setUpdateTime(now); upstreamLogList.add(processTaskRelationLog); @@ -617,14 +661,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P for (Map codeCountMap : countListGroupByProcessDefinitionCode) { long processDefinitionCode = codeCountMap.get("processDefinitionCode"); long countValue = codeCountMap.get("countValue"); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + return Status.PROCESS_DEFINE_NOT_EXIST; + } + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion <= 0) { + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } List processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); if (countValue <= processTaskRelationLogList.size()) { ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0); if (processTaskRelationLog.getPreTaskCode() != 0) { processTaskRelationLog.setPreTaskCode(0); processTaskRelationLog.setPreTaskVersion(0); - updates.add(processTaskRelationLog); } + processTaskRelationLog.setProcessDefinitionVersion(insertVersion); + updates.add(processTaskRelationLog); } if (!processTaskRelationLogList.isEmpty()) { deletes.addAll(processTaskRelationLogList); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 1c52cd12a9..749ebac8a4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -31,12 +31,14 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; @@ -94,6 +96,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + @Autowired private ProcessService processService; @@ -178,6 +183,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param projectCode project code * @param taskCode task code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -213,23 +219,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { int deleteRelation = 0; - int deleteRelationLog = 0; for (ProcessTaskRelation processTaskRelation : taskRelationList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - deleteRelation += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - deleteRelationLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId()); } - if ((deleteRelation & deleteRelationLog) == 0) { + if (deleteRelation == 0) { throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); } + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + updateProcessDefiniteVersion(loginUser, processDefinitionCode); } putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); + throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); } return result; } + private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) { + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); + } + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + if (insertVersion <= 0) { + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + return insertVersion; + } + /** * update task definition * @@ -290,27 +308,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } + handleRelation(loginUser, taskCode, version, now); + result.put(Constants.DATA_LIST, taskCode); + putMsg(result, Status.SUCCESS, update); + return result; + } + + private void handleRelation(User loginUser, long taskCode, Integer version, Date now) { List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { + long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode(); + int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode); List processTaskRelationLogList = new ArrayList<>(); int delete = 0; - int deleteLog = 0; for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); if (processTaskRelationLog.getPreTaskCode() == taskCode) { processTaskRelationLog.setPreTaskVersion(version); } if (processTaskRelationLog.getPostTaskCode() == taskCode) { processTaskRelationLog.setPostTaskVersion(version); } + processTaskRelationLog.setProcessDefinitionVersion(definiteVersion); processTaskRelationLog.setOperator(loginUser.getId()); processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setUpdateTime(now); processTaskRelationLogList.add(processTaskRelationLog); } - if ((delete & deleteLog) == 0) { + if (delete == 0) { throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); } else { int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); @@ -320,9 +346,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } } - result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS, update); - return result; } /** @@ -352,10 +375,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); taskDefinitionUpdate.setUserId(loginUser.getId()); + Date now = new Date(); taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); if (switchVersion > 0) { + handleRelation(loginUser, taskCode, version, now); result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); } else { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index c06dc23a45..497c3d3382 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -667,6 +667,7 @@ public class ProcessDefinitionServiceTest { processDefinition.setTenantId(1); processDefinition.setDescription(""); processDefinition.setCode(46L); + processDefinition.setVersion(1); return processDefinition; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 37a4bb16d7..5db786853e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections.CollectionUtils; @@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceTest { @Mock private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + @Mock + private ProcessService processService; + /** * get Mock Admin User * @@ -456,6 +460,9 @@ public class ProcessTaskRelationServiceTest { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + ProcessDefinition processDefinition = getProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition); + Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1); Map result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode); Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); } @@ -541,6 +548,7 @@ public class ProcessTaskRelationServiceTest { processTaskRelationLog.setProcessDefinitionVersion(1); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); + Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1); result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -549,7 +557,7 @@ public class ProcessTaskRelationServiceTest { public void testDeleteEdge() { long projectCode = 1L; long processDefinitionCode = 3L; - long preTaskCode = 4L; + long preTaskCode = 0L; long postTaskCode = 5L; Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); @@ -563,14 +571,21 @@ public class ProcessTaskRelationServiceTest { ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); processTaskRelation.setProjectCode(projectCode); processTaskRelation.setProcessDefinitionCode(processDefinitionCode); + processTaskRelation.setProcessDefinitionVersion(1); processTaskRelation.setPreTaskCode(preTaskCode); processTaskRelation.setPostTaskCode(postTaskCode); + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(loginUser.getId()); List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelation); Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList); Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1); Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1); + ProcessDefinition processDefinition = getProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); + Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1); result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bc39ca3bd9..c2147239ef 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2184,7 +2184,7 @@ public class ProcessService { if (result > 0) { result = switchProcessTaskRelationVersion(processDefinitionLog); if (result <= 0) { - return Constants.DEFINITION_FAILURE; + return Constants.EXIT_CODE_FAILURE; } } return result; @@ -2196,7 +2196,36 @@ public class ProcessService { processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode()); } List processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); - return processTaskRelationMapper.batchInsert(processTaskRelationLogList); + int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList); + if (batchInsert == 0) { + return Constants.EXIT_CODE_FAILURE; + } else { + int result = 0; + for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) { + int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), taskRelationLog.getPostTaskVersion()); + if (switchResult != Constants.EXIT_CODE_FAILURE) { + result++; + } + } + return result; + } + } + + public int switchTaskDefinitionVersion(long taskCode, int taskVersion) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (taskDefinition == null) { + return Constants.EXIT_CODE_FAILURE; + } + if (taskDefinition.getVersion() == taskVersion) { + return Constants.EXIT_CODE_SUCCESS; + } + TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskVersion); + if (taskDefinitionUpdate == null) { + return Constants.EXIT_CODE_FAILURE; + } + taskDefinitionUpdate.setUpdateTime(new Date()); + taskDefinitionUpdate.setId(taskDefinition.getId()); + return taskDefinitionMapper.updateById(taskDefinitionUpdate); } /**