From 1a4d7a842663574752bcabc222d2d0d778089fb7 Mon Sep 17 00:00:00 2001 From: HomminLee Date: Thu, 29 Sep 2022 19:08:26 +0800 Subject: [PATCH] [Improvement-11678][API] Improvement the error message when batch delete workflow (#11682) * [Improvement] Improvement the error message when batch delete workflow --- .../ProcessDefinitionController.java | 20 +------ .../dolphinscheduler/api/enums/Status.java | 8 ++- .../api/service/ProcessDefinitionService.java | 12 ++++ .../impl/ProcessDefinitionServiceImpl.java | 39 +++++++++++++ .../service/ProcessDefinitionServiceTest.java | 55 +++++++++++++++++++ 5 files changed, 112 insertions(+), 22 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index c6c1cab8d2..ecdc4b89f3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -684,26 +684,8 @@ public class ProcessDefinitionController extends BaseController { public Result batchDeleteProcessDefinitionByCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("codes") String codes) { - Map result = new HashMap<>(); - Set deleteFailedCodeSet = new HashSet<>(); - if (!StringUtils.isEmpty(codes)) { - String[] processDefinitionCodeArray = codes.split(","); - for (String strProcessDefinitionCode : processDefinitionCodeArray) { - long code = Long.parseLong(strProcessDefinitionCode); - try { - processDefinitionService.deleteProcessDefinitionByCode(loginUser, code); - } catch (ServiceException e) { - deleteFailedCodeSet.add(MessageFormat.format(Status.DELETE_PROCESS_DEFINE_BY_CODES_ERROR.getMsg(), - strProcessDefinitionCode)); - } - } - } - if (!deleteFailedCodeSet.isEmpty()) { - putMsg(result, BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR, String.join("\n", deleteFailedCodeSet)); - } else { - putMsg(result, Status.SUCCESS); - } + Map result = processDefinitionService.batchDeleteProcessDefinitionByCodes(loginUser, projectCode, codes); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 231a9b5d08..0581c32668 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -317,9 +317,10 @@ public enum Status { SCHEDULE_STATE_ONLINE(50023, "the status of schedule {0} is already online", "调度配置[{0}]已上线"), DELETE_SCHEDULE_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"), BATCH_DELETE_PROCESS_DEFINE_ERROR(50025, "batch delete process definition error", "批量删除工作流定义错误"), - BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes {0} error", - "批量删除工作流定义[{0}]错误"), - DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "delete process definition by codes {0} error", "删除工作流定义[{0}]错误"), + BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes error: {0}", + "批量删除工作流定义错误: {0}"), + DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "delete process definition by codes error: {0}", + "删除工作流定义错误: {0}"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), @@ -355,6 +356,7 @@ public enum Status { NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"), BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"), START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"), + DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 464ee139be..e887d0f017 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -236,6 +236,18 @@ public interface ProcessDefinitionService { String name, long processDefinitionCode); + /** + * batch delete process definition by code + * + * @param loginUser login user + * @param projectCode project code + * @param codes process definition codes + * @return delete result code + */ + Map batchDeleteProcessDefinitionByCodes(User loginUser, + long projectCode, + String codes); + /** * delete process definition by code * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index dac045b28e..8758b25dd9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -950,6 +950,45 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } + @Override + @Transactional + public Map batchDeleteProcessDefinitionByCodes(User loginUser, long projectCode, String codes) { + Map result = new HashMap<>(); + if (StringUtils.isEmpty(codes)) { + logger.error("Parameter processDefinitionCodes is empty, projectCode is {}.", projectCode); + putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY); + return result; + } + + Set definitionCodes = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong) + .collect(Collectors.toSet()); + List processDefinitionList = processDefinitionMapper.queryByCodes(definitionCodes); + Set queryCodes = + processDefinitionList.stream().map(ProcessDefinition::getCode).collect(Collectors.toSet()); + // definitionCodes - queryCodes + Set diffCode = + definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet()); + + if (!diffCode.isEmpty()) { + logger.error("Process definition does not exist, processCodes:{}.", + diffCode.stream().map(String::valueOf).collect(Collectors.joining(Constants.COMMA))); + throw new ServiceException(Status.BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR, + diffCode.stream().map(code -> code + "[process definition not exist]") + .collect(Collectors.joining(Constants.COMMA)) + ); + } + + for (ProcessDefinition process : processDefinitionList) { + try { + this.deleteProcessDefinitionByCode(loginUser, process.getCode()); + } catch (Exception e) { + throw new ServiceException(Status.DELETE_PROCESS_DEFINE_ERROR, process.getName(), e.getMessage()); + } + } + putMsg(result, Status.SUCCESS); + return result; + } + /** * Process definition want to delete whether used in other task, should throw exception when have be used. * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 0253182c1c..c96e067e58 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -74,6 +74,7 @@ import org.apache.commons.lang3.StringUtils; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -504,6 +505,60 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); } + @Test + public void batchDeleteProcessDefinitionByCodeTest() { + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + + Project project = getProject(projectCode); + + // process check exists + final String twoCodes = "11,12"; + Set definitionCodes = Lists.newArrayList(twoCodes.split(Constants.COMMA)).stream() + .map(Long::parseLong).collect(Collectors.toSet()); + ProcessDefinition process = getProcessDefinition(); + List processDefinitionList = new ArrayList<>(); + processDefinitionList.add(process); + Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); + Throwable exception = Assertions.assertThrows(ServiceException.class, + () -> processDefinitionService.batchDeleteProcessDefinitionByCodes(user, projectCode, twoCodes)); + String formatter = MessageFormat.format(Status.BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR.getMsg(), + "12[process definition not exist]"); + Assertions.assertEquals(formatter, exception.getMessage()); + + // return the right data + Map result = new HashMap<>(); + final String singleCodes = "11"; + definitionCodes = Lists.newArrayList(singleCodes.split(Constants.COMMA)).stream().map(Long::parseLong) + .collect(Collectors.toSet()); + Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process); + + // process definition online + user.setUserType(UserType.ADMIN_USER); + putMsg(result, Status.SUCCESS, projectCode); + process.setReleaseState(ReleaseState.ONLINE); + exception = Assertions.assertThrows(ServiceException.class, + () -> processDefinitionService.batchDeleteProcessDefinitionByCodes(user, projectCode, singleCodes)); + String subFormatter = + MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), process.getName()); + formatter = + MessageFormat.format(Status.DELETE_PROCESS_DEFINE_ERROR.getMsg(), process.getName(), subFormatter); + Assertions.assertEquals(formatter, exception.getMessage()); + + // delete success + process.setReleaseState(ReleaseState.OFFLINE); + Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process); + Mockito.when(processDefinitionMapper.deleteById(process.getId())).thenReturn(1); + Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), process.getCode())) + .thenReturn(1); + Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode())) + .thenReturn(Collections.emptySet()); + putMsg(result, Status.SUCCESS, projectCode); + Map deleteSuccess = + processDefinitionService.batchDeleteProcessDefinitionByCodes(user, projectCode, singleCodes); + Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS)); + } + @Test public void testReleaseProcessDefinition() { Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));