Wenjun Ruan
8 months ago
committed by
GitHub
15 changed files with 535 additions and 175 deletions
@ -0,0 +1,26 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.api; |
||||
|
||||
public interface SchedulerExceptionEnum { |
||||
|
||||
String getCode(); |
||||
|
||||
String getMessage(); |
||||
|
||||
} |
@ -0,0 +1,95 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.Schedule; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.quartz.CronScheduleBuilder; |
||||
import org.quartz.CronTrigger; |
||||
import org.quartz.JobKey; |
||||
import org.quartz.TriggerBuilder; |
||||
import org.quartz.TriggerKey; |
||||
|
||||
/** |
||||
* QuartzCornTriggerBuilder used to build a {@link CronTrigger} instance. |
||||
*/ |
||||
public class QuartzCornTriggerBuilder implements QuartzTriggerBuilder { |
||||
|
||||
private Integer projectId; |
||||
|
||||
private Schedule schedule; |
||||
|
||||
public static QuartzCornTriggerBuilder newBuilder() { |
||||
return new QuartzCornTriggerBuilder(); |
||||
} |
||||
|
||||
public QuartzCornTriggerBuilder withProjectId(Integer projectId) { |
||||
this.projectId = projectId; |
||||
return this; |
||||
} |
||||
|
||||
public QuartzCornTriggerBuilder withSchedule(Schedule schedule) { |
||||
this.schedule = schedule; |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public CronTrigger build() { |
||||
|
||||
if (projectId == null) { |
||||
throw new IllegalArgumentException("projectId cannot be null"); |
||||
} |
||||
if (schedule == null) { |
||||
throw new IllegalArgumentException("schedule cannot be null"); |
||||
} |
||||
|
||||
/** |
||||
* transform from server default timezone to schedule timezone |
||||
* e.g. server default timezone is `UTC` |
||||
* user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`, |
||||
* api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours, |
||||
* so when add job to quartz, it should recover by transform timezone |
||||
*/ |
||||
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), schedule.getTimezoneId()); |
||||
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), schedule.getTimezoneId()); |
||||
/** |
||||
* If the start time is less than the current time, the start time is set to the current time. |
||||
* We do this change to avoid misfires all triggers when update the scheduler. |
||||
*/ |
||||
Date now = new Date(); |
||||
if (startDate.before(now)) { |
||||
startDate = now; |
||||
} |
||||
JobKey jobKey = QuartzJobKey.of(projectId, schedule.getId()).toJobKey(); |
||||
|
||||
TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()); |
||||
return TriggerBuilder.newTrigger() |
||||
.withIdentity(triggerKey) |
||||
.startAt(startDate) |
||||
.endAt(endDate) |
||||
.withSchedule( |
||||
CronScheduleBuilder.cronSchedule(schedule.getCrontab()) |
||||
.withMisfireHandlingInstructionIgnoreMisfires() |
||||
.inTimeZone(DateUtils.getTimezone(schedule.getTimezoneId()))) |
||||
.build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,62 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import lombok.Getter; |
||||
|
||||
import org.quartz.JobDataMap; |
||||
|
||||
@Getter |
||||
public class QuartzJobData { |
||||
|
||||
private static final String PROJECT_ID = "projectId"; |
||||
private static final String SCHEDULE_ID = "scheduleId"; |
||||
|
||||
private final Integer projectId; |
||||
|
||||
private final Integer scheduleId; |
||||
|
||||
private QuartzJobData(Integer projectId, Integer scheduleId) { |
||||
if (projectId == null) { |
||||
throw new IllegalArgumentException("projectId cannot be null"); |
||||
} |
||||
if (scheduleId == null) { |
||||
throw new IllegalArgumentException("schedule cannot be null"); |
||||
} |
||||
this.projectId = projectId; |
||||
this.scheduleId = scheduleId; |
||||
} |
||||
|
||||
public static QuartzJobData of(Integer projectId, Integer scheduleId) { |
||||
return new QuartzJobData(projectId, scheduleId); |
||||
} |
||||
|
||||
public static QuartzJobData of(JobDataMap jobDataMap) { |
||||
Integer projectId = jobDataMap.getInt(PROJECT_ID); |
||||
Integer scheduleId = jobDataMap.getInt(SCHEDULE_ID); |
||||
return of(projectId, scheduleId); |
||||
} |
||||
|
||||
public JobDataMap toJobDataMap() { |
||||
JobDataMap jobDataMap = new JobDataMap(); |
||||
jobDataMap.put(PROJECT_ID, projectId); |
||||
jobDataMap.put(SCHEDULE_ID, scheduleId); |
||||
return jobDataMap; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,58 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import org.quartz.JobBuilder; |
||||
import org.quartz.JobDetail; |
||||
|
||||
public class QuartzJobDetailBuilder { |
||||
|
||||
private Integer projectId; |
||||
|
||||
private Integer scheduleId; |
||||
|
||||
public static QuartzJobDetailBuilder newBuilder() { |
||||
return new QuartzJobDetailBuilder(); |
||||
} |
||||
|
||||
public QuartzJobDetailBuilder withProjectId(Integer projectId) { |
||||
this.projectId = projectId; |
||||
return this; |
||||
} |
||||
|
||||
public QuartzJobDetailBuilder withSchedule(Integer scheduleId) { |
||||
this.scheduleId = scheduleId; |
||||
return this; |
||||
} |
||||
|
||||
public JobDetail build() { |
||||
if (projectId == null) { |
||||
throw new IllegalArgumentException("projectId cannot be null"); |
||||
} |
||||
if (scheduleId == null) { |
||||
throw new IllegalArgumentException("scheduleId cannot be null"); |
||||
} |
||||
QuartzJobData quartzJobData = QuartzJobData.of(projectId, scheduleId); |
||||
|
||||
return JobBuilder.newJob(ProcessScheduleTask.class) |
||||
.withIdentity(QuartzJobKey.of(projectId, scheduleId).toJobKey()) |
||||
.setJobData(quartzJobData.toJobDataMap()) |
||||
.build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,49 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import lombok.Getter; |
||||
|
||||
import org.quartz.JobKey; |
||||
|
||||
@Getter |
||||
public class QuartzJobKey { |
||||
|
||||
private final int schedulerId; |
||||
private final int projectId; |
||||
|
||||
private static final String QUARTZ_JOB_PREFIX = "job"; |
||||
private static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup"; |
||||
private static final String UNDERLINE = "_"; |
||||
|
||||
private QuartzJobKey(int projectId, int schedulerId) { |
||||
this.schedulerId = schedulerId; |
||||
this.projectId = projectId; |
||||
} |
||||
|
||||
public static QuartzJobKey of(int projectId, int schedulerId) { |
||||
return new QuartzJobKey(projectId, schedulerId); |
||||
} |
||||
|
||||
public JobKey toJobKey() { |
||||
// todo: We don't need to add prefix to job name and job group?
|
||||
String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId; |
||||
String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId; |
||||
return new JobKey(jobName, jobGroup); |
||||
} |
||||
} |
@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import org.quartz.CronTrigger; |
||||
|
||||
public interface QuartzTriggerBuilder { |
||||
|
||||
CronTrigger build(); |
||||
} |
@ -0,0 +1,48 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz.exception; |
||||
|
||||
import org.apache.dolphinscheduler.scheduler.api.SchedulerExceptionEnum; |
||||
|
||||
public enum QuartzSchedulerExceptionEnum implements SchedulerExceptionEnum { |
||||
|
||||
QUARTZ_SCHEDULER_START_ERROR("QUARTZ-001", "Quartz Scheduler start error"), |
||||
QUARTZ_UPSERT_JOB_ERROR("QUARTZ-002", "Upsert quartz job error"), |
||||
QUARTZ_DELETE_JOB_ERROR("QUARTZ-003", "Delete quartz job error"), |
||||
QUARTZ_SCHEDULER_SHOWDOWN_ERROR("QUARTZ-004", "Quartz Scheduler shutdown error"), |
||||
; |
||||
|
||||
private final String code; |
||||
|
||||
private final String message; |
||||
|
||||
QuartzSchedulerExceptionEnum(String code, String message) { |
||||
this.code = code; |
||||
this.message = message; |
||||
} |
||||
|
||||
@Override |
||||
public String getCode() { |
||||
return code; |
||||
} |
||||
|
||||
@Override |
||||
public String getMessage() { |
||||
return message; |
||||
} |
||||
} |
@ -1,59 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.Schedule; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import org.quartz.JobKey; |
||||
|
||||
public final class QuartzTaskUtils { |
||||
|
||||
public static final String QUARTZ_JOB_PREFIX = "job"; |
||||
public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup"; |
||||
public static final String UNDERLINE = "_"; |
||||
public static final String PROJECT_ID = "projectId"; |
||||
public static final String SCHEDULE_ID = "scheduleId"; |
||||
public static final String SCHEDULE = "schedule"; |
||||
|
||||
/** |
||||
* @param schedulerId scheduler id |
||||
* @return quartz job name |
||||
*/ |
||||
public static JobKey getJobKey(int schedulerId, int projectId) { |
||||
String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId; |
||||
String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId; |
||||
return new JobKey(jobName, jobGroup); |
||||
} |
||||
|
||||
/** |
||||
* create quartz job data, include projectId and scheduleId, schedule. |
||||
*/ |
||||
public static Map<String, Object> buildDataMap(int projectId, Schedule schedule) { |
||||
Map<String, Object> dataMap = new HashMap<>(8); |
||||
dataMap.put(PROJECT_ID, projectId); |
||||
dataMap.put(SCHEDULE_ID, schedule.getId()); |
||||
dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule)); |
||||
|
||||
return dataMap; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,56 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.quartz.JobDataMap; |
||||
|
||||
import com.google.common.collect.ImmutableMap; |
||||
|
||||
class QuartzJobDataTest { |
||||
|
||||
@Test |
||||
void of() { |
||||
QuartzJobData quartzJobData = QuartzJobData.of(1, 2); |
||||
assertEquals(1, quartzJobData.getProjectId()); |
||||
assertEquals(2, quartzJobData.getScheduleId()); |
||||
} |
||||
|
||||
@Test |
||||
void of_JobDataMap() { |
||||
|
||||
ImmutableMap<String, Object> map = ImmutableMap.of( |
||||
"projectId", 1, |
||||
"scheduleId", 2); |
||||
JobDataMap jobDataMap = new JobDataMap(map); |
||||
QuartzJobData quartzJobData = QuartzJobData.of(jobDataMap); |
||||
assertEquals(1, quartzJobData.getProjectId()); |
||||
assertEquals(2, quartzJobData.getScheduleId()); |
||||
} |
||||
|
||||
@Test |
||||
void toJobDataMap() { |
||||
QuartzJobData quartzJobData = QuartzJobData.of(1, 2); |
||||
JobDataMap jobDataMap = quartzJobData.toJobDataMap(); |
||||
QuartzJobData quartzJobData1 = QuartzJobData.of(jobDataMap); |
||||
assertEquals(1, quartzJobData1.getProjectId()); |
||||
assertEquals(2, quartzJobData1.getScheduleId()); |
||||
} |
||||
} |
@ -0,0 +1,41 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz; |
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.quartz.JobKey; |
||||
|
||||
class QuartzJobKeyTest { |
||||
|
||||
@Test |
||||
void of() { |
||||
QuartzJobKey quartzJobKey = QuartzJobKey.of(1, 2); |
||||
assertEquals(1, quartzJobKey.getProjectId()); |
||||
assertEquals(2, quartzJobKey.getSchedulerId()); |
||||
} |
||||
|
||||
@Test |
||||
void toJobKey() { |
||||
QuartzJobKey quartzJobKey = QuartzJobKey.of(1, 2); |
||||
JobKey jobKey = quartzJobKey.toJobKey(); |
||||
assertEquals("job_2", jobKey.getName()); |
||||
assertEquals("jobgroup_1", jobKey.getGroup()); |
||||
} |
||||
} |
@ -0,0 +1,35 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.scheduler.quartz.exception; |
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
|
||||
import org.apache.dolphinscheduler.scheduler.api.SchedulerException; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
class QuartzSchedulerExceptionEnumTest { |
||||
|
||||
@Test |
||||
void testException() { |
||||
SchedulerException schedulerException = |
||||
new SchedulerException(QuartzSchedulerExceptionEnum.QUARTZ_SCHEDULER_START_ERROR); |
||||
assertEquals("Scheduler[QUARTZ-001] Quartz Scheduler start error", schedulerException.getMessage()); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue