diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 838744767a..303e3e62b0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; @@ -58,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; @@ -141,6 +143,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessDefinitionService processDefinitionService; + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + /** * create task definition * @@ -781,20 +786,59 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe projectCode, taskCode, taskDefinitionToUpdate.getVersion()); // update process task relation List processTaskRelations = processTaskRelationMapper - .queryByTaskCode(taskDefinitionToUpdate.getCode()); + .queryProcessTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(), + taskDefinition.getVersion()); if (CollectionUtils.isNotEmpty(processTaskRelations)) { - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (taskCode == processTaskRelation.getPreTaskCode()) { - processTaskRelation.setPreTaskVersion(version); - } else if (taskCode == processTaskRelation.getPostTaskCode()) { - processTaskRelation.setPostTaskVersion(version); + Map> processTaskRelationGroupList = processTaskRelations.stream() + .collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode)); + for (Map.Entry> processTaskRelationMap : processTaskRelationGroupList + .entrySet()) { + Long processDefinitionCode = processTaskRelationMap.getKey(); + int processDefinitionVersion = + processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionCode) + + 1; + List processTaskRelationList = processTaskRelationMap.getValue(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + if (taskCode == processTaskRelation.getPreTaskCode()) { + processTaskRelation.setPreTaskVersion(version); + } else if (taskCode == processTaskRelation.getPostTaskCode()) { + processTaskRelation.setPostTaskVersion(version); + } + processTaskRelation.setProcessDefinitionVersion(processDefinitionVersion); + int updateProcessDefinitionVersionCount = + processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation); + if (updateProcessDefinitionVersionCount != 1) { + log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.", + projectCode, taskCode); + putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR); + throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR); + } + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setId(null); + processTaskRelationLog.setOperateTime(now); + int insertProcessTaskRelationLogCount = processTaskRelationLogDao.insert(processTaskRelationLog); + if (insertProcessTaskRelationLogCount != 1) { + log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.", + projectCode, taskCode); + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR); + } } - int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation); - if (count != 1) { - log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.", - projectCode, taskCode); - putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR); - throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + processDefinition.setVersion(processDefinitionVersion); + processDefinition.setUpdateTime(now); + processDefinition.setUserId(loginUser.getId()); + // update process definition + int updateProcessDefinitionCount = processDefinitionMapper.updateById(processDefinition); + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); + processDefinitionLog.setOperateTime(now); + processDefinitionLog.setId(null); + processDefinitionLog.setOperator(loginUser.getId()); + int insertProcessDefinitionLogCount = processDefinitionLogMapper.insert(processDefinitionLog); + if ((updateProcessDefinitionCount & insertProcessDefinitionLogCount) != 1) { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 26091e7529..2644712074 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -129,6 +129,9 @@ public class TaskDefinitionServiceImplTest { @Mock private ProcessTaskRelationLogDao processTaskRelationLogDao; + @Mock + private ProcessDefinitionLogMapper processDefinitionLogMapper; + private static final String TASK_PARAMETER = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";; private static final long PROJECT_CODE = 1L; @@ -188,9 +191,14 @@ public class TaskDefinitionServiceImplTest { Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition()); Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); + Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1); + Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition()); + Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1); + Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2()); + Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0)) + .thenReturn(getProcessTaskRelationList2()); Mockito.when(processTaskRelationMapper .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1); result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 9b8dfc8736..07ff5c2d86 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -225,4 +225,14 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryProcessTaskRelationByTaskCodeAndTaskVersion(@Param("taskCode") long taskCode, + @Param("postTaskVersion") long postTaskVersion); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 2da8514c55..742d53726e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -231,7 +231,8 @@ update t_ds_process_task_relation set pre_task_version=#{processTaskRelation.preTaskVersion}, - post_task_version=#{processTaskRelation.postTaskVersion} + post_task_version=#{processTaskRelation.postTaskVersion}, + process_definition_version=#{processTaskRelation.processDefinitionVersion} where id = #{processTaskRelation.id} @@ -240,4 +241,19 @@ from t_ds_process_task_relation where process_definition_code = #{workflowDefinitionCode} and process_definition_version = #{workflowDefinitionVersion} + +