From d7160874e4f086fafff8675123cb8de7f46cffc1 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 22 Sep 2021 20:38:22 +0800 Subject: [PATCH] [Fix-6248] [API] fix processDefinnition release/processInstance update/taskDefinition transaction (#6288) * fix processDefinition save/update/delete/move/import transaction * fix ut * fix processDefinnition release/processInstance update/taskDefinition transaction * fix processDefinnition release/processInstance update/taskDefinition transaction Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../ProcessDefinitionController.java | 6 +-- .../dolphinscheduler/api/enums/Status.java | 2 +- .../impl/ProcessDefinitionServiceImpl.java | 18 +++++--- .../impl/ProcessInstanceServiceImpl.java | 45 +++++++++++-------- .../impl/TaskDefinitionServiceImpl.java | 8 ++-- .../service/ProcessDefinitionServiceTest.java | 1 + .../service/ProcessInstanceServiceTest.java | 3 +- 7 files changed, 50 insertions(+), 33 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index b6de4d8c65..d6cb1bacb8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINI import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.IMPORT_PROCESS_DEFINE_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_VERSIONS_ERROR; @@ -388,7 +388,7 @@ public class ProcessDefinitionController extends BaseController { }) @GetMapping(value = "/{code}") @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) + @ApiException(QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @@ -411,7 +411,7 @@ public class ProcessDefinitionController extends BaseController { }) @GetMapping(value = "/query-by-name") @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) + @ApiException(QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index be0294799e..7ea8f9bdee 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -136,7 +136,7 @@ public enum Status { VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR(10106, "verify process definition name unique error", "工作流定义名称验证错误"), UPDATE_PROCESS_DEFINITION_ERROR(10107, "update process definition error", "更新工作流定义错误"), RELEASE_PROCESS_DEFINITION_ERROR(10108, "release process definition error", "上线工作流错误"), - QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR(10109, "query datail of process definition error", "查询工作流详细信息错误"), + QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR(10109, "query detail of process definition error", "查询工作流详细信息错误"), QUERY_PROCESS_DEFINITION_LIST(10110, "query process definition list", "查询工作流列表错误"), ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR(10111, "encapsulation treeview structure error", "查询工作流树形图数据错误"), GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR(10112, "get tasks list by process definition id error", "查询工作流定义节点信息错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index fe126ce521..164565ee09 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -637,7 +637,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } else if (schedules.size() == 1) { Schedule schedule = schedules.get(0); if (schedule.getReleaseState() == ReleaseState.OFFLINE) { - scheduleMapper.deleteById(schedule.getId()); + int delete = scheduleMapper.deleteById(schedule.getId()); + if (delete == 0) { + putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); + throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); + } } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); return result; @@ -702,16 +706,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro break; case OFFLINE: processDefinition.setReleaseState(releaseState); - processDefinitionMapper.updateById(processDefinition); + int updateProcess = processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( new long[]{processDefinition.getCode()} ); - - for (Schedule schedule : scheduleList) { + if (updateProcess > 0 && scheduleList.size() == 1) { + Schedule schedule = scheduleList.get(0); logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code); // set status schedule.setReleaseState(ReleaseState.OFFLINE); - scheduleMapper.updateById(schedule); + int updateSchedule = scheduleMapper.updateById(schedule); + if (updateSchedule == 0) { + putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); + throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); + } schedulerService.deleteSchedule(project.getId(), schedule.getId()); } break; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index be0cfffe15..194a9922d8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -27,6 +27,7 @@ import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; @@ -413,7 +414,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce * @param tenantCode tenantCode * @return update result code */ - @Transactional + @Transactional(rollbackFor = RuntimeException.class) @Override public Map updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId, String taskRelationJson, String taskDefinitionJson, String scheduleTime, Boolean syncDefine, String globalParams, @@ -451,8 +452,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); if (saveTaskResult == Constants.DEFINITION_FAILURE) { - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - return result; + putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); @@ -470,32 +471,37 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } tenantId = tenant.getId(); } - + ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId); processDefinition.setUpdateTime(new Date()); - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false); - if (insertVersion > 0) { - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); - if (insertResult > 0) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - return result; - } + int insertVersion; + if (processDefinition.equals(processDefinitionDeepCopy)) { + insertVersion = processDefinitionDeepCopy.getVersion(); } else { + processDefinition.setUpdateTime(new Date()); + insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false); + } + if (insertVersion == 0) { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - return result; + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } processInstance.setProcessDefinitionVersion(insertVersion); } int update = processService.updateProcessInstance(processInstance); - if (update > 0) { - putMsg(result, Status.SUCCESS); - } else { + if (update == 0) { putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR); } + putMsg(result, Status.SUCCESS); return result; } @@ -593,6 +599,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); + throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); } return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index adc9961f25..38ad7b11c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; @@ -128,7 +129,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); if (saveTaskResult == Constants.DEFINITION_FAILURE) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - return result; + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } Map resData = new HashMap<>(); resData.put("total", taskDefinitionLogs.size()); @@ -171,7 +172,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param projectCode project code * @param taskCode task code */ - @Transactional(rollbackFor = RuntimeException.class) @Override public Map deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); @@ -255,8 +255,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinitionToUpdate.setCreateTime(now); int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); if ((update & insert) != 1) { - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - return result; + putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS, update); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 44ea0b63f1..506bba3f92 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -400,6 +400,7 @@ public class ProcessDefinitionServiceTest { schedule.setReleaseState(ReleaseState.OFFLINE); schedules.add(schedule); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); + Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); putMsg(result, Status.SUCCESS, projectCode); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 0e93f66e9d..ccce682e55 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -415,11 +415,12 @@ public class ProcessInstanceServiceTest { when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.updateProcessInstance(processInstance)).thenReturn(1); + when(processService.saveProcessDefine(loginUser, processDefinition, false)).thenReturn(1); when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result); putMsg(result, Status.SUCCESS, projectCode); Map processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root"); - Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS)); + Assert.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS)); //success when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);