From 0419543d817ad012ba534e4aa08d81c932b7ffc3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 2 Apr 2024 15:29:58 +0800 Subject: [PATCH] Directly use Quartz replase api to bootstrap a schedule (#15781) --- .../scheduler/api/SchedulerException.java | 8 +- .../scheduler/api/SchedulerExceptionEnum.java | 26 ++++ .../scheduler/quartz/ProcessScheduleTask.java | 11 +- .../quartz/QuartzCornTriggerBuilder.java | 95 ++++++++++++++ .../scheduler/quartz/QuartzJobData.java | 62 +++++++++ .../quartz/QuartzJobDetailBuilder.java | 58 +++++++++ .../scheduler/quartz/QuartzJobKey.java | 49 +++++++ .../scheduler/quartz/QuartzScheduler.java | 123 ++++-------------- .../quartz/QuartzSchedulerConfiguration.java | 14 +- .../quartz/QuartzTriggerBuilder.java | 25 ++++ .../QuartzSchedulerExceptionEnum.java | 48 +++++++ .../quartz/utils/QuartzTaskUtils.java | 59 --------- .../scheduler/quartz/QuartzJobDataTest.java | 56 ++++++++ .../scheduler/quartz/QuartzJobKeyTest.java | 41 ++++++ .../QuartzSchedulerExceptionEnumTest.java | 35 +++++ 15 files changed, 535 insertions(+), 175 deletions(-) create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerExceptionEnum.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzCornTriggerBuilder.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobData.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDetailBuilder.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKey.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzTriggerBuilder.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnum.java delete mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDataTest.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKeyTest.java create mode 100644 dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnumTest.java diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java index c81f114204..5808787cda 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java @@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.scheduler.api; public class SchedulerException extends RuntimeException { - public SchedulerException(String message) { - super(message); + public SchedulerException(SchedulerExceptionEnum schedulerExceptionEnum) { + super("Scheduler[" + schedulerExceptionEnum.getCode() + "] " + schedulerExceptionEnum.getMessage()); } - public SchedulerException(String message, Throwable cause) { - super(message, cause); + public SchedulerException(SchedulerExceptionEnum schedulerExceptionEnum, Throwable cause) { + super("Scheduler[" + schedulerExceptionEnum.getCode() + "] " + schedulerExceptionEnum.getMessage(), cause); } } diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerExceptionEnum.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerExceptionEnum.java new file mode 100644 index 0000000000..d8a0a5ef4e --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerExceptionEnum.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.api; + +public interface SchedulerExceptionEnum { + + String getCode(); + + String getMessage(); + +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java index 2c189d2bf8..c6bd4cf93f 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -33,7 +32,6 @@ import java.util.Date; import lombok.extern.slf4j.Slf4j; -import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobKey; import org.quartz.Scheduler; @@ -56,10 +54,9 @@ public class ProcessScheduleTask extends QuartzJobBean { @Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override protected void executeInternal(JobExecutionContext context) { - JobDataMap dataMap = context.getJobDetail().getJobDataMap(); - - int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID); - int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID); + QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap()); + int projectId = quartzJobData.getProjectId(); + int scheduleId = quartzJobData.getScheduleId(); Date scheduledFireTime = context.getScheduledFireTime(); @@ -110,7 +107,7 @@ public class ProcessScheduleTask extends QuartzJobBean { private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { final Scheduler scheduler = context.getScheduler(); - JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId, projectId); + JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey(); try { if (scheduler.checkExists(jobKey)) { log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId); diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzCornTriggerBuilder.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzCornTriggerBuilder.java new file mode 100644 index 0000000000..b7177e74db --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzCornTriggerBuilder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.Schedule; + +import java.util.Date; + +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobKey; +import org.quartz.TriggerBuilder; +import org.quartz.TriggerKey; + +/** + * QuartzCornTriggerBuilder used to build a {@link CronTrigger} instance. + */ +public class QuartzCornTriggerBuilder implements QuartzTriggerBuilder { + + private Integer projectId; + + private Schedule schedule; + + public static QuartzCornTriggerBuilder newBuilder() { + return new QuartzCornTriggerBuilder(); + } + + public QuartzCornTriggerBuilder withProjectId(Integer projectId) { + this.projectId = projectId; + return this; + } + + public QuartzCornTriggerBuilder withSchedule(Schedule schedule) { + this.schedule = schedule; + return this; + } + + @Override + public CronTrigger build() { + + if (projectId == null) { + throw new IllegalArgumentException("projectId cannot be null"); + } + if (schedule == null) { + throw new IllegalArgumentException("schedule cannot be null"); + } + + /** + * transform from server default timezone to schedule timezone + * e.g. server default timezone is `UTC` + * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`, + * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours, + * so when add job to quartz, it should recover by transform timezone + */ + Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), schedule.getTimezoneId()); + Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), schedule.getTimezoneId()); + /** + * If the start time is less than the current time, the start time is set to the current time. + * We do this change to avoid misfires all triggers when update the scheduler. + */ + Date now = new Date(); + if (startDate.before(now)) { + startDate = now; + } + JobKey jobKey = QuartzJobKey.of(projectId, schedule.getId()).toJobKey(); + + TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()); + return TriggerBuilder.newTrigger() + .withIdentity(triggerKey) + .startAt(startDate) + .endAt(endDate) + .withSchedule( + CronScheduleBuilder.cronSchedule(schedule.getCrontab()) + .withMisfireHandlingInstructionIgnoreMisfires() + .inTimeZone(DateUtils.getTimezone(schedule.getTimezoneId()))) + .build(); + } + +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobData.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobData.java new file mode 100644 index 0000000000..09e255bef3 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import lombok.Getter; + +import org.quartz.JobDataMap; + +@Getter +public class QuartzJobData { + + private static final String PROJECT_ID = "projectId"; + private static final String SCHEDULE_ID = "scheduleId"; + + private final Integer projectId; + + private final Integer scheduleId; + + private QuartzJobData(Integer projectId, Integer scheduleId) { + if (projectId == null) { + throw new IllegalArgumentException("projectId cannot be null"); + } + if (scheduleId == null) { + throw new IllegalArgumentException("schedule cannot be null"); + } + this.projectId = projectId; + this.scheduleId = scheduleId; + } + + public static QuartzJobData of(Integer projectId, Integer scheduleId) { + return new QuartzJobData(projectId, scheduleId); + } + + public static QuartzJobData of(JobDataMap jobDataMap) { + Integer projectId = jobDataMap.getInt(PROJECT_ID); + Integer scheduleId = jobDataMap.getInt(SCHEDULE_ID); + return of(projectId, scheduleId); + } + + public JobDataMap toJobDataMap() { + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(PROJECT_ID, projectId); + jobDataMap.put(SCHEDULE_ID, scheduleId); + return jobDataMap; + } + +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDetailBuilder.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDetailBuilder.java new file mode 100644 index 0000000000..7a579d0183 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDetailBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import org.quartz.JobBuilder; +import org.quartz.JobDetail; + +public class QuartzJobDetailBuilder { + + private Integer projectId; + + private Integer scheduleId; + + public static QuartzJobDetailBuilder newBuilder() { + return new QuartzJobDetailBuilder(); + } + + public QuartzJobDetailBuilder withProjectId(Integer projectId) { + this.projectId = projectId; + return this; + } + + public QuartzJobDetailBuilder withSchedule(Integer scheduleId) { + this.scheduleId = scheduleId; + return this; + } + + public JobDetail build() { + if (projectId == null) { + throw new IllegalArgumentException("projectId cannot be null"); + } + if (scheduleId == null) { + throw new IllegalArgumentException("scheduleId cannot be null"); + } + QuartzJobData quartzJobData = QuartzJobData.of(projectId, scheduleId); + + return JobBuilder.newJob(ProcessScheduleTask.class) + .withIdentity(QuartzJobKey.of(projectId, scheduleId).toJobKey()) + .setJobData(quartzJobData.toJobDataMap()) + .build(); + } + +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKey.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKey.java new file mode 100644 index 0000000000..b7666094e2 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKey.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import lombok.Getter; + +import org.quartz.JobKey; + +@Getter +public class QuartzJobKey { + + private final int schedulerId; + private final int projectId; + + private static final String QUARTZ_JOB_PREFIX = "job"; + private static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup"; + private static final String UNDERLINE = "_"; + + private QuartzJobKey(int projectId, int schedulerId) { + this.schedulerId = schedulerId; + this.projectId = projectId; + } + + public static QuartzJobKey of(int projectId, int schedulerId) { + return new QuartzJobKey(projectId, schedulerId); + } + + public JobKey toJobKey() { + // todo: We don't need to add prefix to job name and job group? + String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId; + String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId; + return new JobKey(jobName, jobGroup); + } +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java index 7d0f1ce73c..70d5bf8fab 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java @@ -17,20 +17,10 @@ package org.apache.dolphinscheduler.scheduler.quartz; -import static org.quartz.CronScheduleBuilder.cronSchedule; -import static org.quartz.JobBuilder.newJob; -import static org.quartz.TriggerBuilder.newTrigger; - -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.scheduler.api.SchedulerException; -import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils; - -import java.util.Date; -import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.dolphinscheduler.scheduler.quartz.exception.QuartzSchedulerExceptionEnum; import lombok.extern.slf4j.Slf4j; @@ -38,118 +28,49 @@ import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; -import org.quartz.TriggerKey; -import org.springframework.beans.factory.annotation.Autowired; -import com.google.common.base.Strings; +import com.google.common.collect.Sets; @Slf4j public class QuartzScheduler implements SchedulerApi { - @Autowired - private Scheduler scheduler; + private final Scheduler scheduler; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + public QuartzScheduler(Scheduler scheduler) { + this.scheduler = scheduler; + } @Override public void start() throws SchedulerException { try { scheduler.start(); } catch (Exception e) { - throw new SchedulerException("Failed to start quartz scheduler ", e); + throw new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_SCHEDULER_START_ERROR, e); } } @Override public void insertOrUpdateScheduleTask(int projectId, Schedule schedule) throws SchedulerException { - JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId); - Map jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule); - String cronExpression = schedule.getCrontab(); - String timezoneId = schedule.getTimezoneId(); - - /** - * transform from server default timezone to schedule timezone - * e.g. server default timezone is `UTC` - * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`, - * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours, - * so when add job to quartz, it should recover by transform timezone - */ - Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId); - Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId); - /** - * If the start time is less than the current time, the start time is set to the current time. - * We do this change to avoid misfires all triggers when update the scheduler. - */ - Date now = new Date(); - if (startDate.before(now)) { - startDate = now; - } - - lock.writeLock().lock(); try { - - JobDetail jobDetail; - // add a task (if this task already exists, return this task directly) - if (scheduler.checkExists(jobKey)) { - - jobDetail = scheduler.getJobDetail(jobKey); - jobDetail.getJobDataMap().putAll(jobDataMap); - } else { - jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build(); - - jobDetail.getJobDataMap().putAll(jobDataMap); - - scheduler.addJob(jobDetail, false, true); - - log.info("Add job, job name: {}, group name: {}", jobKey.getName(), jobKey.getGroup()); - } - - TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup()); - /* - * Instructs the Scheduler that upon a mis-fire situation, the CronTrigger wants to have it's next-fire-time - * updated to the next time in the schedule after the current time (taking into account any associated - * Calendar), but it does not want to be fired now. - */ - CronTrigger cronTrigger = newTrigger() - .withIdentity(triggerKey) - .startAt(startDate) - .endAt(endDate) - .withSchedule( - cronSchedule(cronExpression) - .withMisfireHandlingInstructionIgnoreMisfires() - .inTimeZone(DateUtils.getTimezone(timezoneId))) - .forJob(jobDetail).build(); - - if (scheduler.checkExists(triggerKey)) { - // updateProcessInstance scheduler trigger when scheduler cycle changes - CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); - String oldCronExpression = oldCronTrigger.getCronExpression(); - - if (!Strings.nullToEmpty(cronExpression).equalsIgnoreCase(Strings.nullToEmpty(oldCronExpression))) { - // reschedule job trigger - scheduler.rescheduleJob(triggerKey, cronTrigger); - log.info( - "reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", - triggerKey.getName(), triggerKey.getGroup(), cronExpression, startDate, endDate); - } - } else { - scheduler.scheduleJob(cronTrigger); - log.info( - "schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", - triggerKey.getName(), triggerKey.getGroup(), cronExpression, startDate, endDate); - } - + CronTrigger cornTrigger = QuartzCornTriggerBuilder.newBuilder() + .withProjectId(projectId) + .withSchedule(schedule) + .build(); + JobDetail jobDetail = QuartzJobDetailBuilder.newBuilder() + .withProjectId(projectId) + .withSchedule(schedule.getId()) + .build(); + scheduler.scheduleJob(jobDetail, Sets.newHashSet(cornTrigger), true); + log.info("Success scheduleJob: {} with trigger: {} at quartz", jobDetail, cornTrigger); } catch (Exception e) { log.error("Failed to add scheduler task, projectId: {}, scheduler: {}", projectId, schedule, e); - throw new SchedulerException("Add schedule job failed", e); - } finally { - lock.writeLock().unlock(); + throw new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_UPSERT_JOB_ERROR, e); } } @Override public void deleteScheduleTask(int projectId, int scheduleId) throws SchedulerException { - JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId, projectId); + JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey(); try { if (scheduler.checkExists(jobKey)) { log.info("Try to delete scheduler task, projectId: {}, schedulerId: {}", projectId, scheduleId); @@ -157,7 +78,7 @@ public class QuartzScheduler implements SchedulerApi { } } catch (Exception e) { log.error("Failed to delete scheduler task, projectId: {}, schedulerId: {}", projectId, scheduleId, e); - throw new SchedulerException("Failed to delete scheduler task"); + throw new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_DELETE_JOB_ERROR, e); } } @@ -166,8 +87,8 @@ public class QuartzScheduler implements SchedulerApi { // nothing to do try { scheduler.shutdown(); - } catch (org.quartz.SchedulerException e) { - throw new SchedulerException("Failed to shutdown scheduler", e); + } catch (Exception e) { + throw new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_SCHEDULER_SHOWDOWN_ERROR, e); } } } diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java index 123e4cde50..07ff8af434 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java @@ -19,14 +19,20 @@ package org.apache.dolphinscheduler.scheduler.quartz; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; +import org.quartz.Scheduler; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -@Configuration +@AutoConfiguration(after = {QuartzAutoConfiguration.class}) +@ConditionalOnClass(value = Scheduler.class) public class QuartzSchedulerConfiguration { @Bean - public SchedulerApi schedulerApi() { - return new QuartzScheduler(); + @ConditionalOnMissingBean + public SchedulerApi schedulerApi(Scheduler scheduler) { + return new QuartzScheduler(scheduler); } } diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzTriggerBuilder.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzTriggerBuilder.java new file mode 100644 index 0000000000..06c9101c5f --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzTriggerBuilder.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import org.quartz.CronTrigger; + +public interface QuartzTriggerBuilder { + + CronTrigger build(); +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnum.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnum.java new file mode 100644 index 0000000000..76fa39e911 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnum.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz.exception; + +import org.apache.dolphinscheduler.scheduler.api.SchedulerExceptionEnum; + +public enum QuartzSchedulerExceptionEnum implements SchedulerExceptionEnum { + + QUARTZ_SCHEDULER_START_ERROR("QUARTZ-001", "Quartz Scheduler start error"), + QUARTZ_UPSERT_JOB_ERROR("QUARTZ-002", "Upsert quartz job error"), + QUARTZ_DELETE_JOB_ERROR("QUARTZ-003", "Delete quartz job error"), + QUARTZ_SCHEDULER_SHOWDOWN_ERROR("QUARTZ-004", "Quartz Scheduler shutdown error"), + ; + + private final String code; + + private final String message; + + QuartzSchedulerExceptionEnum(String code, String message) { + this.code = code; + this.message = message; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getMessage() { + return message; + } +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java deleted file mode 100644 index e4f3471399..0000000000 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.scheduler.quartz.utils; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Schedule; - -import java.util.HashMap; -import java.util.Map; - -import org.quartz.JobKey; - -public final class QuartzTaskUtils { - - public static final String QUARTZ_JOB_PREFIX = "job"; - public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup"; - public static final String UNDERLINE = "_"; - public static final String PROJECT_ID = "projectId"; - public static final String SCHEDULE_ID = "scheduleId"; - public static final String SCHEDULE = "schedule"; - - /** - * @param schedulerId scheduler id - * @return quartz job name - */ - public static JobKey getJobKey(int schedulerId, int projectId) { - String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId; - String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId; - return new JobKey(jobName, jobGroup); - } - - /** - * create quartz job data, include projectId and scheduleId, schedule. - */ - public static Map buildDataMap(int projectId, Schedule schedule) { - Map dataMap = new HashMap<>(8); - dataMap.put(PROJECT_ID, projectId); - dataMap.put(SCHEDULE_ID, schedule.getId()); - dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule)); - - return dataMap; - } - -} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDataTest.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDataTest.java new file mode 100644 index 0000000000..e9493cc782 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobDataTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.quartz.JobDataMap; + +import com.google.common.collect.ImmutableMap; + +class QuartzJobDataTest { + + @Test + void of() { + QuartzJobData quartzJobData = QuartzJobData.of(1, 2); + assertEquals(1, quartzJobData.getProjectId()); + assertEquals(2, quartzJobData.getScheduleId()); + } + + @Test + void of_JobDataMap() { + + ImmutableMap map = ImmutableMap.of( + "projectId", 1, + "scheduleId", 2); + JobDataMap jobDataMap = new JobDataMap(map); + QuartzJobData quartzJobData = QuartzJobData.of(jobDataMap); + assertEquals(1, quartzJobData.getProjectId()); + assertEquals(2, quartzJobData.getScheduleId()); + } + + @Test + void toJobDataMap() { + QuartzJobData quartzJobData = QuartzJobData.of(1, 2); + JobDataMap jobDataMap = quartzJobData.toJobDataMap(); + QuartzJobData quartzJobData1 = QuartzJobData.of(jobDataMap); + assertEquals(1, quartzJobData1.getProjectId()); + assertEquals(2, quartzJobData1.getScheduleId()); + } +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKeyTest.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKeyTest.java new file mode 100644 index 0000000000..aafbadefb1 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzJobKeyTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.quartz.JobKey; + +class QuartzJobKeyTest { + + @Test + void of() { + QuartzJobKey quartzJobKey = QuartzJobKey.of(1, 2); + assertEquals(1, quartzJobKey.getProjectId()); + assertEquals(2, quartzJobKey.getSchedulerId()); + } + + @Test + void toJobKey() { + QuartzJobKey quartzJobKey = QuartzJobKey.of(1, 2); + JobKey jobKey = quartzJobKey.toJobKey(); + assertEquals("job_2", jobKey.getName()); + assertEquals("jobgroup_1", jobKey.getGroup()); + } +} diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnumTest.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnumTest.java new file mode 100644 index 0000000000..943b2cc125 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/test/java/org/apache/dolphinscheduler/scheduler/quartz/exception/QuartzSchedulerExceptionEnumTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.scheduler.quartz.exception; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.dolphinscheduler.scheduler.api.SchedulerException; + +import org.junit.jupiter.api.Test; + +class QuartzSchedulerExceptionEnumTest { + + @Test + void testException() { + SchedulerException schedulerException = + new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_SCHEDULER_START_ERROR); + assertEquals("Scheduler[QUARTZ-001] Quartz Scheduler start error", schedulerException.getMessage()); + } + +}