Browse Source

update schdule api (#6977)

3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
5a04b8d49a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 146
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  4. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
  6. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java
  7. 6
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -269,7 +269,7 @@ public enum Status {
CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"),
UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"),

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -664,21 +664,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// get the timing according to the process definition
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(code);
if (!schedules.isEmpty() && schedules.size() > 1) {
logger.warn("scheduler num is {},Greater than 1", schedules.size());
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
return result;
} else if (schedules.size() == 1) {
Schedule schedule = schedules.get(0);
if (schedule.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(schedule.getId());
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
if (scheduleObj != null) {
if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
} else if (schedule.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
}
if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
}
}
@ -815,12 +811,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return DagDataSchedule
*/
public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) {
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode());
DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition));
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
schedule.setReleaseState(ReleaseState.OFFLINE);
dagDataSchedule.setSchedule(schedule);
if (scheduleObj != null) {
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
dagDataSchedule.setSchedule(scheduleObj);
}
return dagDataSchedule;
}

146
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -36,10 +36,12 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -101,6 +103,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private Scheduler scheduler;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
/**
* save schedule
*
@ -247,57 +252,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
/**
* scheduling on-line status forbid modification
*/
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return result;
}
Date now = new Date();
// updateProcessInstance param
if (!StringUtils.isEmpty(scheduleExpression)) {
ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return result;
}
schedule.setCrontab(scheduleParam.getCrontab());
schedule.setTimezoneId(scheduleParam.getTimezoneId());
}
if (warningType != null) {
schedule.setWarningType(warningType);
}
schedule.setWarningGroupId(warningGroupId);
if (failureStrategy != null) {
schedule.setFailureStrategy(failureStrategy);
}
schedule.setWorkerGroup(workerGroup);
schedule.setEnvironmentCode(environmentCode);
schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.updateById(schedule);
/**
* updateProcessInstance recipients and cc by process definition ID
*/
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
return result;
}
@ -345,7 +300,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
@ -633,6 +592,87 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Priority processInstancePriority,
String workerGroup,
long environmentCode) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check schedule exists
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
return result;
}
private void updateSchedule(Map<String, Object> result,
Schedule schedule,
ProcessDefinition processDefinition,
String scheduleExpression,
WarningType warningType,
int warningGroupId,
FailureStrategy failureStrategy,
Priority processInstancePriority,
String workerGroup,
long environmentCode) {
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return;
}
Date now = new Date();
// updateProcessInstance param
if (!StringUtils.isEmpty(scheduleExpression)) {
ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
if (scheduleParam == null) {
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return;
}
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return;
}
schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return;
}
schedule.setCrontab(scheduleParam.getCrontab());
schedule.setTimezoneId(scheduleParam.getTimezoneId());
}
if (warningType != null) {
schedule.setWarningType(warningType);
}
schedule.setWarningGroupId(warningGroupId);
if (failureStrategy != null) {
schedule.setFailureStrategy(failureStrategy);
}
schedule.setWorkerGroup(workerGroup);
schedule.setEnvironmentCode(environmentCode);
schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.updateById(schedule);
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
}
}

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -378,32 +378,28 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<Schedule> schedules = new ArrayList<>();
schedules.add(getSchedule());
schedules.add(getSchedule());
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
//scheduler online
schedules.clear();
Schedule schedule = getSchedule();
schedule.setReleaseState(ReleaseState.ONLINE);
schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//delete success
schedules.clear();
schedule.setReleaseState(ReleaseState.OFFLINE);
schedules.add(schedule);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java

@ -58,9 +58,9 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
* query schedule list by process definition code
* @param processDefinitionCode processDefinitionCode
* @return schedule list
* @return schedule
*/
List<Schedule> queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query schedule list by process definition code

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java

@ -200,12 +200,12 @@ public class ScheduleMapperTest extends BaseDaoTest {
* test query by process definition id
*/
@Test
public void queryByProcessDefinitionId() {
public void queryByProcessDefinitionCode() {
Schedule schedule = insertOne();
schedule.setProcessDefinitionCode(12345);
scheduleMapper.updateById(schedule);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
Assert.assertNotEquals(schedules.size(), 0);
Schedule schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode());
Assert.assertNotNull(schedules);
}
}

6
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -259,16 +259,16 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
long processDefinitionCode,
String schedule,
String workerGroup) {
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
if (schedules.isEmpty()) {
if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = schedules.get(0).getId();
scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);

Loading…
Cancel
Save