From 5a04b8d49aa8e30a60a3fdc0cdb3b2f6910aa084 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Thu, 25 Nov 2021 10:22:41 +0800 Subject: [PATCH] update schdule api (#6977) --- .../dolphinscheduler/api/enums/Status.java | 2 +- .../impl/ProcessDefinitionServiceImpl.java | 27 ++-- .../service/impl/SchedulerServiceImpl.java | 146 +++++++++++------- .../service/ProcessDefinitionServiceTest.java | 18 +-- .../dao/mapper/ScheduleMapper.java | 4 +- .../dao/mapper/ScheduleMapperTest.java | 6 +- .../server/PythonGatewayServer.java | 6 +- 7 files changed, 120 insertions(+), 89 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 506e6a6558..4fbec8efbd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -269,7 +269,7 @@ public enum Status { CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"), PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), - PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"), + PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"), CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"), CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"), UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 17ca1dd10b..1276717243 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -664,21 +664,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } // get the timing according to the process definition - List schedules = scheduleMapper.queryByProcessDefinitionCode(code); - if (!schedules.isEmpty() && schedules.size() > 1) { - logger.warn("scheduler num is {},Greater than 1", schedules.size()); - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); - return result; - } else if (schedules.size() == 1) { - Schedule schedule = schedules.get(0); - if (schedule.getReleaseState() == ReleaseState.OFFLINE) { - int delete = scheduleMapper.deleteById(schedule.getId()); + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code); + if (scheduleObj != null) { + if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) { + int delete = scheduleMapper.deleteById(scheduleObj.getId()); if (delete == 0) { putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); } - } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); + } + if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId()); return result; } } @@ -815,12 +811,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return DagDataSchedule */ public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) { - List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode()); + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode()); DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition)); - if (!schedules.isEmpty()) { - Schedule schedule = schedules.get(0); - schedule.setReleaseState(ReleaseState.OFFLINE); - dagDataSchedule.setSchedule(schedule); + if (scheduleObj != null) { + scheduleObj.setReleaseState(ReleaseState.OFFLINE); + dagDataSchedule.setSchedule(scheduleObj); } return dagDataSchedule; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 0a920aac7b..7658efe28a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -36,10 +36,12 @@ import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -101,6 +103,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe @Autowired private Scheduler scheduler; + @Autowired + private ProcessTaskRelationMapper processTaskRelationMapper; + /** * save schedule * @@ -247,57 +252,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } - /** - * scheduling on-line status forbid modification - */ - if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { - return result; - } - - Date now = new Date(); - - // updateProcessInstance param - if (!StringUtils.isEmpty(scheduleExpression)) { - ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class); - if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { - logger.warn("The start time must not be the same as the end"); - putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); - return result; - } - schedule.setStartTime(scheduleParam.getStartTime()); - schedule.setEndTime(scheduleParam.getEndTime()); - if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { - putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab()); - return result; - } - schedule.setCrontab(scheduleParam.getCrontab()); - schedule.setTimezoneId(scheduleParam.getTimezoneId()); - } - - if (warningType != null) { - schedule.setWarningType(warningType); - } - - schedule.setWarningGroupId(warningGroupId); - - if (failureStrategy != null) { - schedule.setFailureStrategy(failureStrategy); - } - - schedule.setWorkerGroup(workerGroup); - schedule.setEnvironmentCode(environmentCode); - schedule.setUpdateTime(now); - schedule.setProcessInstancePriority(processInstancePriority); - scheduleMapper.updateById(schedule); - - /** - * updateProcessInstance recipients and cc by process definition ID - */ - processDefinition.setWarningGroupId(warningGroupId); - - processDefinitionMapper.updateById(processDefinition); - - putMsg(result, Status.SUCCESS); + updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode); return result; } @@ -345,7 +300,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode()); return result; } - + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode()); + if (processTaskRelations.isEmpty()) { + putMsg(result, Status.PROCESS_DAG_IS_EMPTY); + return result; + } if (scheduleStatus == ReleaseState.ONLINE) { // check process definition release state if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { @@ -633,6 +592,87 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Priority processInstancePriority, String workerGroup, long environmentCode) { - return null; + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + // check schedule exists + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); + if (schedule == null) { + putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode); + return result; + } + + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); + return result; + } + + updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode); + return result; + } + + private void updateSchedule(Map result, + Schedule schedule, + ProcessDefinition processDefinition, + String scheduleExpression, + WarningType warningType, + int warningGroupId, + FailureStrategy failureStrategy, + Priority processInstancePriority, + String workerGroup, + long environmentCode) { + if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { + return; + } + + Date now = new Date(); + + // updateProcessInstance param + if (!StringUtils.isEmpty(scheduleExpression)) { + ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class); + if (scheduleParam == null) { + putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR); + return; + } + if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) { + logger.warn("The start time must not be the same as the end"); + putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); + return; + } + schedule.setStartTime(scheduleParam.getStartTime()); + schedule.setEndTime(scheduleParam.getEndTime()); + if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { + putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab()); + return; + } + schedule.setCrontab(scheduleParam.getCrontab()); + schedule.setTimezoneId(scheduleParam.getTimezoneId()); + } + + if (warningType != null) { + schedule.setWarningType(warningType); + } + + schedule.setWarningGroupId(warningGroupId); + + if (failureStrategy != null) { + schedule.setFailureStrategy(failureStrategy); + } + + schedule.setWorkerGroup(workerGroup); + schedule.setEnvironmentCode(environmentCode); + schedule.setUpdateTime(now); + schedule.setProcessInstancePriority(processInstancePriority); + scheduleMapper.updateById(schedule); + + processDefinition.setWarningGroupId(warningGroupId); + + processDefinitionMapper.updateById(processDefinition); + + putMsg(result, Status.SUCCESS); } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 7fe02eb7e3..964555b4c3 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -378,32 +378,28 @@ public class ProcessDefinitionServiceTest { Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - List schedules = new ArrayList<>(); - schedules.add(getSchedule()); - schedules.add(getSchedule()); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); + Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1); + Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1); + Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1); Map schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); - Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS)); + Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS)); //scheduler online - schedules.clear(); Schedule schedule = getSchedule(); schedule.setReleaseState(ReleaseState.ONLINE); - schedules.add(schedule); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule); Map schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS)); //delete success - schedules.clear(); schedule.setReleaseState(ReleaseState.OFFLINE); - schedules.add(schedule); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Map deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index 37fae5d393..be7369c708 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -58,9 +58,9 @@ public interface ScheduleMapper extends BaseMapper { /** * query schedule list by process definition code * @param processDefinitionCode processDefinitionCode - * @return schedule list + * @return schedule */ - List queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); /** * query schedule list by process definition code diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java index 65874c9ba1..be4ff6f73f 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java @@ -200,12 +200,12 @@ public class ScheduleMapperTest extends BaseDaoTest { * test query by process definition id */ @Test - public void queryByProcessDefinitionId() { + public void queryByProcessDefinitionCode() { Schedule schedule = insertOne(); schedule.setProcessDefinitionCode(12345); scheduleMapper.updateById(schedule); - List schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode()); - Assert.assertNotEquals(schedules.size(), 0); + Schedule schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode()); + Assert.assertNotNull(schedules); } } diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 77c23449de..531c55b1c1 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -259,16 +259,16 @@ public class PythonGatewayServer extends SpringBootServletInitializer { long processDefinitionCode, String schedule, String workerGroup) { - List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); // create or update schedule int scheduleId; - if (schedules.isEmpty()) { + if (scheduleObj == null) { processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE, DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); scheduleId = (int) result.get("scheduleId"); } else { - scheduleId = schedules.get(0).getId(); + scheduleId = scheduleObj.getId(); processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE, DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);