From 1e9973299fbb7109959941951f3695667bf7e8c8 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Mon, 29 Nov 2021 20:13:49 +0800 Subject: [PATCH] add processDefinition releaseWorkflowAndSchedule (#7044) --- .../ProcessDefinitionController.java | 24 ---- .../api/service/ProcessDefinitionService.java | 12 -- .../impl/ProcessDefinitionServiceImpl.java | 107 ++++++++++++++---- 3 files changed, 82 insertions(+), 61 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index ec7c5f9abb..d6c1849af4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -813,28 +813,4 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) { return returnDataList(processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState)); } - - /** - * delete process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @return update result code - */ - @ApiOperation(value = "deleteWorkflowAndSchedule", notes = "DELETE_WORKFLOW_SCHEDULE_NOTES") - @ApiImplicitParams({ - @ApiImplicitParam(name = "projectCode", value = "PROCESS_DEFINITION_NAME", required = true, type = "Long"), - @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789") - }) - @DeleteMapping(value = "/{code}/delete-workflow") - @ResponseStatus(HttpStatus.OK) - @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR) - @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result deleteWorkflowAndSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code", required = true) long code) { - return returnDataList(processDefinitionService.deleteWorkflowAndSchedule(loginUser, projectCode, code)); - } - } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index e1e05035e2..2f1c3f7e25 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -397,17 +397,5 @@ public interface ProcessDefinitionService { long projectCode, long code, ReleaseState releaseState); - - /** - * delete process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @return update result code - */ - Map deleteWorkflowAndSchedule(User loginUser, - long projectCode, - long code); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 2a3c14df56..f472150cdd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -75,6 +75,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.io.BufferedOutputStream; import java.io.IOException; @@ -719,23 +720,28 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); + return result; + } switch (releaseState) { case ONLINE: + List relationList = processService.findRelationByCode(projectCode, code); + if (CollectionUtils.isEmpty(relationList)) { + putMsg(result, Status.PROCESS_DAG_IS_EMPTY); + return result; + } processDefinition.setReleaseState(releaseState); processDefinitionMapper.updateById(processDefinition); break; case OFFLINE: processDefinition.setReleaseState(releaseState); int updateProcess = processDefinitionMapper.updateById(processDefinition); - List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new long[]{processDefinition.getCode()} - ); - if (updateProcess > 0 && scheduleList.size() == 1) { - Schedule schedule = scheduleList.get(0); - logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code); + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code); + if (updateProcess > 0 && schedule != null) { + logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, schedule.getId(), code); // set status - schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setReleaseState(releaseState); int updateSchedule = scheduleMapper.updateById(schedule); if (updateSchedule == 0) { putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); @@ -1585,7 +1591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - if (scheduleJson == null || scheduleJson.trim().isEmpty()) { + if (StringUtils.isBlank(scheduleJson)) { return result; } @@ -1613,14 +1619,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro long projectCode, long processDefinitionCode, String scheduleJson) { + Map result = new HashMap<>(); Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); + if (schedule == null) { + putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); + throw new ServiceException(Status.DATA_IS_NOT_VALID); + } // set default value FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy(); WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType(); Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority(); int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup(); - Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); + long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); ScheduleParam param = new ScheduleParam(); param.setStartTime(schedule.getStartTime()); @@ -1711,7 +1722,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - if (scheduleJson == null || scheduleJson.trim().isEmpty()) { + if (StringUtils.isBlank(scheduleJson)) { return result; } // update dag schedule @@ -1746,14 +1757,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro long projectCode, long processDefinitionCode, String scheduleJson) { + Map result = new HashMap<>(); Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); + if (schedule == null) { + putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); + throw new ServiceException(Status.DATA_IS_NOT_VALID); + } // set default value FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy(); WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType(); Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority(); int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup(); - Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); + long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); ScheduleParam param = new ScheduleParam(); param.setStartTime(schedule.getStartTime()); @@ -1783,21 +1799,62 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param releaseState releaseState * @return update result code */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, ReleaseState releaseState) { - return null; - } + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + // check state + if (null == releaseState) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); + return result; + } - /** - * delete process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @return update result code - */ - @Override - public Map deleteWorkflowAndSchedule(User loginUser, long projectCode, long code) { - return null; + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); + return result; + } + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code); + if (scheduleObj == null) { + putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code); + return result; + } + switch (releaseState) { + case ONLINE: + List relationList = processService.findRelationByCode(projectCode, code); + if (CollectionUtils.isEmpty(relationList)) { + putMsg(result, Status.PROCESS_DAG_IS_EMPTY); + return result; + } + processDefinition.setReleaseState(releaseState); + processDefinitionMapper.updateById(processDefinition); + scheduleMapper.updateById(scheduleObj); + break; + case OFFLINE: + processDefinition.setReleaseState(releaseState); + int updateProcess = processDefinitionMapper.updateById(processDefinition); + if (updateProcess > 0) { + logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, scheduleObj.getId(), code); + // set status + scheduleObj.setReleaseState(ReleaseState.OFFLINE); + int updateSchedule = scheduleMapper.updateById(scheduleObj); + if (updateSchedule == 0) { + putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); + throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); + } + schedulerService.deleteSchedule(project.getId(), scheduleObj.getId()); + } + break; + default: + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); + return result; + } + putMsg(result, Status.SUCCESS); + return result; } }