From ddd4474189614e764a67fbef91aaddd22aaf9ce0 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 9 Mar 2022 21:36:38 +0800 Subject: [PATCH] change quartz executor to be managed by spring (#8774) Co-authored-by: caishunfeng <534328519@qq.com> --- .../service/impl/SchedulerServiceImpl.java | 12 +++-- .../api/service/SchedulerServiceTest.java | 17 ++----- .../service/quartz/ProcessScheduleJob.java | 8 +++- .../service/quartz/QuartzExecutor.java | 47 +++++++++++++++++++ .../QuartzExecutorImpl.java} | 40 +++++++--------- 5 files changed, 83 insertions(+), 41 deletions(-) create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutor.java rename dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/{QuartzExecutors.java => impl/QuartzExecutorImpl.java} (86%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 268d89e77a..e46672f802 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -46,7 +46,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; -import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.service.quartz.QuartzExecutor; +import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.commons.lang.StringUtils; @@ -106,6 +107,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; + @Autowired + private QuartzExecutor quartzExecutor; + /** * save schedule * @@ -442,7 +446,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe public void setSchedule(int projectId, Schedule schedule) { logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId()); - QuartzExecutors.getInstance().addJob(scheduler, ProcessScheduleJob.class, projectId, schedule); + quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule); } /** @@ -456,8 +460,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe public void deleteSchedule(int projectId, int scheduleId) { logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId); - String jobName = QuartzExecutors.buildJobName(scheduleId); - String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); + String jobName = quartzExecutor.buildJobName(scheduleId); + String jobGroupName = quartzExecutor.buildJobGroupName(projectId); JobKey jobKey = new JobKey(jobName, jobGroupName); try { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java index d2a83b9f46..6ac83e795d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl; import java.util.ArrayList; import java.util.HashMap; @@ -46,7 +46,6 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -54,7 +53,7 @@ import org.powermock.modules.junit4.PowerMockRunner; * scheduler service test */ @RunWith(PowerMockRunner.class) -@PrepareForTest(QuartzExecutors.class) +@PrepareForTest(QuartzExecutorImpl.class) public class SchedulerServiceTest { @InjectMocks @@ -81,20 +80,12 @@ public class SchedulerServiceTest { @Mock private ProjectServiceImpl projectService; - @Mock - private QuartzExecutors quartzExecutors; + @InjectMocks + private QuartzExecutorImpl quartzExecutors; @Before public void setUp() { - quartzExecutors = PowerMockito.mock(QuartzExecutors.class); - PowerMockito.mockStatic(QuartzExecutors.class); - try { - PowerMockito.doReturn(quartzExecutors).when(QuartzExecutors.class, "getInstance"); - } catch (Exception e) { - e.printStackTrace(); - } - } @Test diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index 2f811c3bf2..ec5ed36f45 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl; import java.util.Date; @@ -46,6 +47,9 @@ public class ProcessScheduleJob extends QuartzJobBean { @Autowired private ProcessService processService; + @Autowired + private QuartzExecutor quartzExecutor; + @Counted(value = "quartz_job_executed") @Timed(value = "quartz_job_execution", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override @@ -96,8 +100,8 @@ public class ProcessScheduleJob extends QuartzJobBean { private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { final Scheduler scheduler = context.getScheduler(); - String jobName = QuartzExecutors.buildJobName(scheduleId); - String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); + String jobName = quartzExecutor.buildJobName(scheduleId); + String jobGroupName = quartzExecutor.buildJobGroupName(projectId); JobKey jobKey = new JobKey(jobName, jobGroupName); try { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutor.java new file mode 100644 index 0000000000..e0e76e5652 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.quartz; + +import org.apache.dolphinscheduler.dao.entity.Schedule; + +import java.util.Map; + +import org.quartz.Job; + +public interface QuartzExecutor { + + /** + * build job name + */ + String buildJobName(int scheduleId); + + /** + * build job group name + */ + String buildJobGroupName(int projectId); + + /** + * build data map of job detail + */ + Map buildDataMap(int projectId, Schedule schedule); + + /** + * add job to quartz + */ + void addJob(Class clazz, int projectId, final Schedule schedule); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java similarity index 86% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java index bca3060206..9379909f6b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.quartz; +package org.apache.dolphinscheduler.service.quartz.impl; import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID; import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX; @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.service.exceptions.ServiceException; +import org.apache.dolphinscheduler.service.quartz.QuartzExecutor; import org.apache.commons.lang.StringUtils; @@ -49,22 +50,17 @@ import org.quartz.Scheduler; import org.quartz.TriggerKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; -public class QuartzExecutors { - private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class); +@Service +public class QuartzExecutorImpl implements QuartzExecutor { + private static final Logger logger = LoggerFactory.getLogger(QuartzExecutorImpl.class); - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - private static final class Holder { - private static final QuartzExecutors instance = new QuartzExecutors(); - } + @Autowired + private Scheduler scheduler; - private QuartzExecutors() { - } - - public static QuartzExecutors getInstance() { - return Holder.instance; - } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** * add task trigger , if this task already exists, return this task with updated trigger @@ -73,12 +69,12 @@ public class QuartzExecutors { * @param projectId projectId * @param schedule schedule */ - public void addJob(Scheduler scheduler, Class clazz, int projectId, final Schedule schedule) { - String jobName = QuartzExecutors.buildJobName(schedule.getId()); - String jobGroupName = QuartzExecutors.buildJobGroupName(projectId); + public void addJob(Class clazz, int projectId, final Schedule schedule) { + String jobName = this.buildJobName(schedule.getId()); + String jobGroupName = this.buildJobGroupName(projectId); Date startDate = schedule.getStartTime(); Date endDate = schedule.getEndTime(); - Map jobDataMap = QuartzExecutors.buildDataMap(projectId, schedule); + Map jobDataMap = this.buildDataMap(projectId, schedule); String cronExpression = schedule.getCrontab(); String timezoneId = schedule.getTimezoneId(); @@ -146,15 +142,15 @@ public class QuartzExecutors { } } - public static String buildJobName(int processId) { - return QUARTZ_JOB_PRIFIX + UNDERLINE + processId; + public String buildJobName(int scheduleId) { + return QUARTZ_JOB_PRIFIX + UNDERLINE + scheduleId; } - public static String buildJobGroupName(int projectId) { + public String buildJobGroupName(int projectId) { return QUARTZ_JOB_GROUP_PRIFIX + UNDERLINE + projectId; } - public static Map buildDataMap(int projectId, Schedule schedule) { + public Map buildDataMap(int projectId, Schedule schedule) { Map dataMap = new HashMap<>(8); dataMap.put(PROJECT_ID, projectId); dataMap.put(SCHEDULE_ID, schedule.getId());