From ca5af013ae6a2c9272ece92264060cf5ebbf80f2 Mon Sep 17 00:00:00 2001 From: insist777 <84278047+insist777@users.noreply.github.com> Date: Wed, 14 Dec 2022 14:57:57 +0800 Subject: [PATCH] [Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094) Co-authored-by: xiangzihao <460888207@qq.com> --- .../ProcessTaskRelationV2Controller.java | 3 +- .../dto/workflow/WorkflowUpdateRequest.java | 2 - .../dolphinscheduler/api/enums/Status.java | 1 + .../service/ProcessTaskRelationService.java | 8 +- .../impl/ProcessDefinitionServiceImpl.java | 107 +++++++++++++- .../impl/ProcessTaskRelationServiceImpl.java | 135 ++++++++++++++++-- .../impl/TaskDefinitionServiceImpl.java | 15 +- .../service/ProcessDefinitionServiceTest.java | 14 +- .../TaskDefinitionServiceImplTest.java | 108 +++++++++++++- 9 files changed, 360 insertions(+), 33 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java index 8430651220..b6798e39bf 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java @@ -125,7 +125,8 @@ public class ProcessTaskRelationV2Controller extends BaseController { @PathVariable("code") Long code, @RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { List processTaskRelations = processTaskRelationService - .updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest); + .updateUpstreamTaskDefinitionWithSyncDag(loginUser, code, Boolean.TRUE, + taskRelationUpdateUpstreamRequest); return Result.success(processTaskRelations); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java index 7e3d90d46a..c402285eff 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java @@ -103,8 +103,6 @@ public class WorkflowUpdateRequest { processDefinitionDeepCopy.setLocations(this.location); } - int version = processDefinitionDeepCopy.getVersion() + 1; - processDefinitionDeepCopy.setVersion(version); processDefinitionDeepCopy.setUpdateTime(new Date()); return processDefinitionDeepCopy; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index d0f82b3e99..0f1cb95b47 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -381,6 +381,7 @@ public enum Status { "批量创建工作流任务关系 {0} 错误"), PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error", "批量修改工作流任务关系错误"), + UPSTREAM_TASK_NOT_EXISTS(50071, "upstream task want to set dependence do not exists {0}", "指定的上游任务 {0} 不存在"), WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation", "工作流实例未结束,不能执行此操作"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index 7a31d49916..f5a1f7a082 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -86,11 +86,13 @@ public interface ProcessTaskRelationService { * * @param loginUser login user * @param taskCode relation upstream code + * @param needSyncDag needSyncDag * @param taskRelationUpdateUpstreamRequest relation downstream code */ - List updateUpstreamTaskDefinition(User loginUser, - long taskCode, - TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest); + List updateUpstreamTaskDefinitionWithSyncDag(User loginUser, + long taskCode, + Boolean needSyncDag, + TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest); /** * delete task upstream relation diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5897807a0f..4b94af1ee7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service.impl; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST; @@ -2775,12 +2776,110 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } processDefinitionUpdate.setTenantId(tenant.getId()); } - int update = processDefinitionMapper.updateById(processDefinitionUpdate); - if (update <= 0) { + int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate); + if (insertVersion == 0) { + logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.", + processDefinitionUpdate.getCode(), + processDefinitionUpdate.getName()); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } - this.syncObj2Log(loginUser, processDefinition); - return processDefinition; + + int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion); + if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) { + logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", + processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", + processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); + processDefinitionUpdate.setVersion(insertVersion); + return processDefinitionUpdate; + } + + public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) { + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); + Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); + int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; + processDefinitionLog.setVersion(insertVersion); + processDefinition.setVersion(insertVersion); + + processDefinitionLog.setOperator(loginUser.getId()); + processDefinition.setUserId(loginUser.getId()); + processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); + processDefinition.setUpdateTime(processDefinition.getUpdateTime()); + processDefinitionLog.setId(null); + int result = processDefinitionMapper.updateById(processDefinition); + + int insertLog = processDefinitionLogMapper.insert(processDefinitionLog); + processDefinitionLog.setId(processDefinition.getId()); + return (insertLog & result) > 0 ? insertVersion : 0; + } + + public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition, + int processDefinitionVersion) { + long projectCode = processDefinition.getProjectCode(); + long processDefinitionCode = processDefinition.getCode(); + List taskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List taskRelationList = + taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + + List taskCodeList = + taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList()); + List taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList); + List taskDefinitionLogs = + taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList()); + + if (taskRelationList.isEmpty()) { + return Constants.EXIT_CODE_SUCCESS; + } + Map taskDefinitionLogMap = null; + if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { + taskDefinitionLogMap = taskDefinitionLogs + .stream() + .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); + } + Date now = new Date(); + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion); + if (taskDefinitionLogMap != null) { + TaskDefinitionLog preTaskDefinitionLog = + taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode()); + if (preTaskDefinitionLog != null) { + processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion()); + } + TaskDefinitionLog postTaskDefinitionLog = + taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()); + if (postTaskDefinitionLog != null) { + processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion()); + } + } + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + } + if (!taskRelations.isEmpty()) { + Set processTaskRelationSet = + taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); + Set taskRelationSet = + taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); + boolean isSame = CollectionUtils.isEqualCollection(processTaskRelationSet, + taskRelationSet); + if (isSame) { + logger.info("process task relations is non-existent, projectCode:{}, processCode:{}.", + processDefinition.getProjectCode(), processDefinition.getCode()); + return Constants.EXIT_CODE_SUCCESS; + } + processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); + } + List processTaskRelations = + taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList()); + int insert = processTaskRelationMapper.batchInsert(processTaskRelations); + int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); + return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; } protected Map updateDagSchedule(User loginUser, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index df1b075d9d..4feff61e4a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service.impl; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; @@ -31,12 +32,14 @@ import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; @@ -93,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P @Autowired private ProcessDefinitionMapper processDefinitionMapper; + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + @Autowired private ProcessService processService; @@ -402,13 +408,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P * * @param loginUser login user * @param taskCode relation upstream code + * @param needSyncDag needSyncDag * @param taskRelationUpdateUpstreamRequest relation downstream code */ @Override @Transactional - public List updateUpstreamTaskDefinition(User loginUser, - long taskCode, - TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { + public List updateUpstreamTaskDefinitionWithSyncDag(User loginUser, + long taskCode, + Boolean needSyncDag, + TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode); if (downstreamTask == null) { throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); @@ -436,17 +444,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, taskRelationUpdateUpstreamRequest.toString()); } - + processDefinition.setUpdateTime(new Date()); + int insertVersion = processDefinition.getVersion(); + if (needSyncDag) { + insertVersion = + this.saveProcessDefine(loginUser, processDefinition); + if (insertVersion <= 0) { + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + } // get new relation to create and out of date relation to delete List taskCodeCreates = upstreamTaskCodes .stream() .filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch( processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode)) .collect(Collectors.toList()); - List taskCodeDeletes = processTaskRelationExists - .stream() - .map(ProcessTaskRelation::getPreTaskCode) - .filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode)) + List taskCodeDeletes = processTaskRelationExists.stream() + .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode())) + .map(ProcessTaskRelation::getId) .collect(Collectors.toList()); // delete relation not exists @@ -460,10 +475,20 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P // create relation not exists List processTaskRelations = new ArrayList<>(); for (long createCode : taskCodeCreates) { - TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode); + long upstreamCode = 0L; + int version = 0; + if (createCode != 0L) { + // 0 for DAG root, should not, it may already exists and skip to create anymore + TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode); + if (upstreamTask == null) { + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode); + } + upstreamCode = upstreamTask.getCode(); + version = upstreamTask.getVersion(); + } ProcessTaskRelation processTaskRelationCreate = new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(), - processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(), + processDefinition.getCode(), upstreamCode, version, downstreamTask.getCode(), downstreamTask.getVersion(), null, null); processTaskRelations.add(processTaskRelationCreate); } @@ -473,10 +498,98 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } // batch sync to process task relation log - this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations); + int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion); + if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) { + logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", + processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", + processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); + processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion); return processTaskRelations; } + public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition, + int processDefinitionVersion) { + long projectCode = processDefinition.getProjectCode(); + long processDefinitionCode = processDefinition.getCode(); + List taskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List taskRelationList = + taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + + List taskCodeList = + taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList()); + List taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList); + List taskDefinitionLogs = + taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList()); + + if (taskRelationList.isEmpty()) { + return Constants.EXIT_CODE_SUCCESS; + } + Map taskDefinitionLogMap = null; + if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) { + taskDefinitionLogMap = taskDefinitionLogs + .stream() + .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); + } + Date now = new Date(); + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion); + if (taskDefinitionLogMap != null) { + TaskDefinitionLog preTaskDefinitionLog = + taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode()); + if (preTaskDefinitionLog != null) { + processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion()); + } + TaskDefinitionLog postTaskDefinitionLog = + taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()); + if (postTaskDefinitionLog != null) { + processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion()); + } + } + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + } + if (CollectionUtils.isNotEmpty(taskRelations)) { + Set processTaskRelationSet = + taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); + Set taskRelationSet = + taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); + boolean isSame = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet, + taskRelationSet); + if (isSame) { + return Constants.EXIT_CODE_SUCCESS; + } + processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); + } + List processTaskRelations = + taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList()); + int insert = processTaskRelationMapper.batchInsert(processTaskRelations); + int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); + return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; + } + + public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) { + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); + Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); + int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; + processDefinitionLog.setVersion(insertVersion); + processDefinitionLog.setOperator(loginUser.getId()); + processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); + processDefinitionLog.setId(null); + int insertLog = processDefinitionLogMapper.insert(processDefinitionLog); + + processDefinitionLog.setId(processDefinition.getId()); + int result = processDefinitionMapper.updateById(processDefinitionLog); + return (insertLog & result) > 0 ? insertVersion : 0; + } + private void updateRelation(User loginUser, Map result, ProcessDefinition processDefinition, List processTaskRelationList) { List relationLogs = diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index b641d1d7f4..9abbb07253 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -214,8 +214,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe String upstreamCodes) { TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest(); taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode); - taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes); - return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode, + if (upstreamCodes != null) { + taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes); + } + return processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(user, taskCode, Boolean.FALSE, taskRelationUpdateUpstreamRequest); } @@ -498,9 +500,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } - private void updateDag(User loginUser, long processDefinitionCode, - List processTaskRelationList, - List taskDefinitionLogs) { + public void updateDag(User loginUser, long processDefinitionCode, + List processTaskRelationList, + List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode); @@ -625,6 +627,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode); + if (CollectionUtils.isNotEmpty(taskRelationList)) { logger.info( "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", @@ -634,10 +637,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe .queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode); updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog)); } - this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(), taskUpdateRequest.getUpstreamTasksCodes()); - return taskDefinitionUpdate; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8b600696b7..5ada289227 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -1068,11 +1068,23 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { ((ServiceException) exception).getCode()); // success + Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode())) + .thenReturn(processDefinition.getVersion()); Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1); ProcessDefinition processDefinitionUpdate = processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode, workflowUpdateRequest); - Assertions.assertEquals(processDefinition, processDefinitionUpdate); + Assertions.assertNotNull(processDefinitionUpdate); + + // check version + Assertions.assertEquals(processDefinition.getVersion() + 1, processDefinitionUpdate.getVersion()); + } + + @Test + public void testCheckVersion() { + WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest(); + workflowFilterRequest.setWorkflowName(name); + } @Test diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index b6569a1e67..b6668dff15 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; @@ -40,18 +41,23 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.process.ProcessServiceImpl; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import java.text.MessageFormat; @@ -91,9 +97,18 @@ public class TaskDefinitionServiceImplTest { @Mock private ProjectServiceImpl projectService; + @InjectMocks + private ProcessServiceImpl processServiceImpl; + @Mock private ProcessService processService; + @Mock + private ProcessDefinitionLogMapper processDefineLogMapper; + + @Mock + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + @Mock private ProcessTaskRelationMapper processTaskRelationMapper; @@ -429,11 +444,15 @@ public class TaskDefinitionServiceImplTest { // success Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), - isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); + Mockito.when( + processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), + isA(Boolean.class), + isA(TaskRelationUpdateUpstreamRequest.class))) + .thenReturn(getProcessTaskRelationList()); Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + } @Test @@ -500,10 +519,54 @@ public class TaskDefinitionServiceImplTest { // success Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), - isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); + Mockito.when( + processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), + isA(Boolean.class), + isA(TaskRelationUpdateUpstreamRequest.class))) + .thenReturn(getProcessTaskRelationList()); Assertions.assertDoesNotThrow( () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + TaskDefinition taskDefinition = + taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest); + Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion()); + } + + @Test + public void testUpdateDag() { + User loginUser = getLoginUser(); + ProcessDefinition processDefinition = getProcessDefinition(); + processDefinition.setId(null); + List processTaskRelationList = getProcessTaskRelationList(); + TaskDefinitionLog taskDefinitionLog = getTaskDefinitionLog(); + ArrayList taskDefinitionLogs = new ArrayList<>(); + taskDefinitionLogs.add(taskDefinitionLog); + Integer version = 1; + Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition); + + // saveProcessDefine + Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version); + Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); + Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); + int insertVersion = + processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); + Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)) + .thenReturn(insertVersion); + Assertions.assertEquals(insertVersion, version + 1); + + // saveTaskRelation + List processTaskRelationLogList = getProcessTaskRelationLogList(); + Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()), + eq(processDefinition.getCode()))).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1); + Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1); + int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs, + Boolean.TRUE); + Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult); + Assertions.assertDoesNotThrow( + () -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList, + taskDefinitionLogs)); } @Test @@ -528,6 +591,17 @@ public class TaskDefinitionServiceImplTest { Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); } + /** + * create admin user + */ + private User getLoginUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setUserName("admin"); + loginUser.setId(1); + return loginUser; + } + /** * get mock Project * @@ -563,6 +637,19 @@ public class TaskDefinitionServiceImplTest { return taskDefinition; } + private TaskDefinitionLog getTaskDefinitionLog() { + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + taskDefinitionLog.setProjectCode(PROJECT_CODE); + taskDefinitionLog.setCode(TASK_CODE); + taskDefinitionLog.setVersion(VERSION); + taskDefinitionLog.setTaskType("SHELL"); + taskDefinitionLog.setTaskParams(TASK_PARAMETER); + taskDefinitionLog.setFlag(Flag.YES); + taskDefinitionLog.setCpuQuota(RESOURCE_RATE); + taskDefinitionLog.setMemoryMax(RESOURCE_RATE); + return taskDefinitionLog; + } + private List getProcessTaskRelationList() { List processTaskRelationList = new ArrayList<>(); @@ -576,6 +663,19 @@ public class TaskDefinitionServiceImplTest { return processTaskRelationList; } + private List getProcessTaskRelationLogList() { + List processTaskRelationLogList = new ArrayList<>(); + + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(PROJECT_CODE); + processTaskRelationLog.setProcessDefinitionCode(PROCESS_DEFINITION_CODE); + processTaskRelationLog.setPreTaskCode(TASK_CODE); + processTaskRelationLog.setPostTaskCode(TASK_CODE + 1L); + + processTaskRelationLogList.add(processTaskRelationLog); + return processTaskRelationLogList; + } + private List getProcessTaskRelationList2() { List processTaskRelationList = new ArrayList<>();