Browse Source

Add dolphinscheduler-scheduler module (#10360)

* Add dolphinscheduler-scheduler module
3.1.0-release
Wenjun Ruan 3 years ago committed by GitHub
parent
commit
2d3be6b36c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-api/pom.xml
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 31
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  4. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
  5. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  6. 4
      dolphinscheduler-master/pom.xml
  7. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  8. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  9. 37
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/pom.xml
  10. 56
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerApi.java
  11. 33
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java
  12. 78
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/pom.xml
  13. 23
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
  14. 97
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java
  15. 32
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java
  16. 59
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java
  17. 35
      dolphinscheduler-scheduler-plugin/pom.xml
  18. 14
      dolphinscheduler-service/pom.xml
  19. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java
  20. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java
  21. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java
  22. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java
  23. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  24. 120
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
  25. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  26. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  27. 12
      pom.xml

23
dolphinscheduler-api/pom.xml

@ -64,6 +64,10 @@
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId> <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.codehaus.janino</groupId> <groupId>org.codehaus.janino</groupId>
@ -121,25 +125,6 @@
<artifactId>spring-context</artifactId> <artifactId>spring-context</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>mchange-commons-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>io.springfox</groupId> <groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId> <artifactId>springfox-swagger2</artifactId>

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -67,8 +67,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;

31
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -45,10 +45,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutor;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -60,12 +59,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.quartz.CronExpression; import org.quartz.CronExpression;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -102,13 +99,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired @Autowired
private Scheduler scheduler; private SchedulerApi schedulerApi;
@Autowired @Autowired
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private QuartzExecutor quartzExecutor;
/** /**
* save schedule * save schedule
@ -460,8 +455,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
public void setSchedule(int projectId, Schedule schedule) { public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId()); logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
} }
/** /**
@ -474,20 +468,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Override @Override
public void deleteSchedule(int projectId, int scheduleId) { public void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId); logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
schedulerApi.deleteScheduleTask(projectId, scheduleId);
String jobName = quartzExecutor.buildJobName(scheduleId);
String jobGroupName = quartzExecutor.buildJobGroupName(projectId);
JobKey jobKey = new JobKey(jobName, jobGroupName);
try {
if (scheduler.checkExists(jobKey)) {
logger.info("Try to delete job: {}, group name: {},", jobName, jobGroupName);
scheduler.deleteJob(jobKey);
}
} catch (SchedulerException e) {
logger.error("Failed to delete job: {}", jobKey);
throw new ServiceException("Failed to delete job: " + jobKey);
}
} }
/** /**

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java

@ -31,8 +31,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -53,7 +54,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
* scheduler service test * scheduler service test
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest(QuartzExecutorImpl.class)
public class SchedulerServiceTest { public class SchedulerServiceTest {
@InjectMocks @InjectMocks
@ -80,8 +80,8 @@ public class SchedulerServiceTest {
@Mock @Mock
private ProjectServiceImpl projectService; private ProjectServiceImpl projectService;
@InjectMocks @Mock
private QuartzExecutorImpl quartzExecutors; private SchedulerApi schedulerApi;
@Before @Before
public void setUp() { public void setUp() {

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -494,26 +494,6 @@ public final class Constants {
* underline "_" * underline "_"
*/ */
public static final String UNDERLINE = "_"; public static final String UNDERLINE = "_";
/**
* quartz job prifix
*/
public static final String QUARTZ_JOB_PREFIX = "job";
/**
* quartz job group prifix
*/
public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
/**
* projectId
*/
public static final String PROJECT_ID = "projectId";
/**
* processId
*/
public static final String SCHEDULE_ID = "scheduleId";
/**
* schedule
*/
public static final String SCHEDULE = "schedule";
/** /**
* application regex * application regex
*/ */

4
dolphinscheduler-master/pom.xml

@ -51,6 +51,10 @@
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId> <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
@ -77,7 +78,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService; private MasterSchedulerService masterSchedulerService;
@Autowired @Autowired
private Scheduler scheduler; private SchedulerApi schedulerApi;
@Autowired @Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor; private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@ -154,7 +155,7 @@ public class MasterServer implements IStoppable {
this.eventExecuteService.start(); this.eventExecuteService.start();
this.failoverExecuteThread.start(); this.failoverExecuteThread.start();
this.scheduler.start(); this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) { if (Stopper.isRunning()) {
@ -188,6 +189,7 @@ public class MasterServer implements IStoppable {
logger.warn("thread sleep exception ", e); logger.warn("thread sleep exception ", e);
} }
// close // close
this.schedulerApi.close();
this.masterSchedulerService.close(); this.masterSchedulerService.close();
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.masterRegistryClient.closeRegistry(); this.masterRegistryClient.closeRegistry();

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import net.bytebuddy.implementation.bytecode.Throw;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
@ -71,8 +70,8 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;

37
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/pom.xml

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-scheduler-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
</dependency>
</dependencies>
</project>

56
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerApi.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.scheduler.api;
import org.apache.dolphinscheduler.dao.entity.Schedule;
/**
* This is the interface for scheduler, contains methods to operate schedule task.
*/
public interface SchedulerApi extends AutoCloseable{
/**
* Start the scheduler, if not start, the scheduler will not execute task.
*
* @throws SchedulerException if start failed.
*/
void start() throws SchedulerException;
/**
* @param projectId project id, the schedule task belongs to.
* @param schedule schedule metadata.
* @throws SchedulerException if insert/update failed.
*/
void insertOrUpdateScheduleTask(int projectId, Schedule schedule) throws SchedulerException;
/**
* Delete a schedule task.
*
* @param projectId project id, the schedule task belongs to.
* @param scheduleId schedule id.
* @throws SchedulerException if delete failed.
*/
void deleteScheduleTask(int projectId, int scheduleId) throws SchedulerException;
/**
* Close the scheduler and release the resource.
*
* @throws SchedulerException if close failed.
*/
void close() throws Exception;
}

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutor.java → dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-api/src/main/java/org/apache/dolphinscheduler/scheduler/api/SchedulerException.java

@ -15,33 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz; package org.apache.dolphinscheduler.scheduler.api;
import org.apache.dolphinscheduler.dao.entity.Schedule; public class SchedulerException extends RuntimeException {
import java.util.Map; public SchedulerException(String message) {
super(message);
}
import org.quartz.Job; public SchedulerException(String message, Throwable cause) {
super(message, cause);
}
public interface QuartzExecutor {
/**
* build job name
*/
String buildJobName(int scheduleId);
/**
* build job group name
*/
String buildJobGroupName(int projectId);
/**
* build data map of job detail
*/
Map<String, Object> buildDataMap(int projectId, Schedule schedule);
/**
* add job to quartz
*/
void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule);
} }

78
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/pom.xml

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-meter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>mchange-commons-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
</dependency>
</dependencies>
</project>

23
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java → dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz; package org.apache.dolphinscheduler.scheduler.quartz;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl;
import java.util.Date; import java.util.Date;
@ -41,14 +41,12 @@ import org.springframework.util.StringUtils;
import io.micrometer.core.annotation.Counted; import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed; import io.micrometer.core.annotation.Timed;
public class ProcessScheduleJob extends QuartzJobBean { public class ProcessScheduleTask extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
@Autowired private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleTask.class);
private ProcessService processService;
@Autowired @Autowired
private QuartzExecutor quartzExecutor; private ProcessService processService;
@Counted(value = "quartz_job_executed") @Counted(value = "quartz_job_executed")
@Timed(value = "quartz_job_execution", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Timed(value = "quartz_job_execution", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@ -56,8 +54,8 @@ public class ProcessScheduleJob extends QuartzJobBean {
protected void executeInternal(JobExecutionContext context) { protected void executeInternal(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap(); JobDataMap dataMap = context.getJobDetail().getJobDataMap();
int projectId = dataMap.getInt(Constants.PROJECT_ID); int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID); int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
Date scheduledFireTime = context.getScheduledFireTime(); Date scheduledFireTime = context.getScheduledFireTime();
@ -100,13 +98,10 @@ public class ProcessScheduleJob extends QuartzJobBean {
private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
final Scheduler scheduler = context.getScheduler(); final Scheduler scheduler = context.getScheduler();
String jobName = quartzExecutor.buildJobName(scheduleId); JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId, projectId);
String jobGroupName = quartzExecutor.buildJobGroupName(projectId);
JobKey jobKey = new JobKey(jobName, jobGroupName);
try { try {
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
logger.info("Try to delete job: {}, group name: {},", jobName, jobGroupName); logger.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId);
scheduler.deleteJob(jobKey); scheduler.deleteJob(jobKey);
} }
} catch (Exception e) { } catch (Exception e) {

97
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java → dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzScheduler.java

@ -15,33 +15,25 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.impl; package org.apache.dolphinscheduler.scheduler.quartz;
import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob; import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger; import static org.quartz.TriggerBuilder.newTrigger;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutor; import org.apache.dolphinscheduler.scheduler.api.SchedulerException;
import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Strings;
import org.quartz.CronTrigger; import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail; import org.quartz.JobDetail;
import org.quartz.JobKey; import org.quartz.JobKey;
import org.quartz.Scheduler; import org.quartz.Scheduler;
@ -49,30 +41,31 @@ import org.quartz.TriggerKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service import com.google.common.base.Strings;
public class QuartzExecutorImpl implements QuartzExecutor {
private static final Logger logger = LoggerFactory.getLogger(QuartzExecutorImpl.class); public class QuartzScheduler implements SchedulerApi {
private static final Logger logger = LoggerFactory.getLogger(QuartzScheduler.class);
@Autowired @Autowired
private Scheduler scheduler; private Scheduler scheduler;
private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* add task trigger , if this task already exists, return this task with updated trigger
*
* @param clazz job class name
* @param projectId projectId
* @param schedule schedule
*/
@Override @Override
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) { public void start() throws SchedulerException {
String jobName = this.buildJobName(schedule.getId()); try {
String jobGroupName = this.buildJobGroupName(projectId); scheduler.start();
} catch (Exception e) {
throw new SchedulerException("Failed to start quartz scheduler ", e);
}
}
Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule); @Override
public void insertOrUpdateScheduleTask(int projectId, Schedule schedule) throws SchedulerException {
JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);
Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);
String cronExpression = schedule.getCrontab(); String cronExpression = schedule.getCrontab();
String timezoneId = schedule.getTimezoneId(); String timezoneId = schedule.getTimezoneId();
@ -89,7 +82,6 @@ public class QuartzExecutorImpl implements QuartzExecutor {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
JobKey jobKey = new JobKey(jobName, jobGroupName);
JobDetail jobDetail; JobDetail jobDetail;
//add a task (if this task already exists, return this task directly) //add a task (if this task already exists, return this task directly)
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
@ -97,17 +89,16 @@ public class QuartzExecutorImpl implements QuartzExecutor {
jobDetail = scheduler.getJobDetail(jobKey); jobDetail = scheduler.getJobDetail(jobKey);
jobDetail.getJobDataMap().putAll(jobDataMap); jobDetail.getJobDataMap().putAll(jobDataMap);
} else { } else {
jobDetail = newJob(clazz).withIdentity(jobKey).build(); jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();
jobDetail.getJobDataMap().putAll(jobDataMap); jobDetail.getJobDataMap().putAll(jobDataMap);
scheduler.addJob(jobDetail, false, true); scheduler.addJob(jobDetail, false, true);
logger.info("Add job, job name: {}, group name: {}", logger.info("Add job, job name: {}, group name: {}", jobKey.getName(), jobKey.getGroup());
jobName, jobGroupName);
} }
TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName); TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
/* /*
* Instructs the Scheduler that upon a mis-fire * Instructs the Scheduler that upon a mis-fire
* situation, the CronTrigger wants to have it's * situation, the CronTrigger wants to have it's
@ -135,39 +126,43 @@ public class QuartzExecutorImpl implements QuartzExecutor {
// reschedule job trigger // reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger); scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate); triggerKey.getName(), triggerKey.getGroup(), cronExpression, startDate, endDate);
} }
} else { } else {
scheduler.scheduleJob(cronTrigger); scheduler.scheduleJob(cronTrigger);
logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate); triggerKey.getName(), triggerKey.getGroup(), cronExpression, startDate, endDate);
} }
} catch (Exception e) { } catch (Exception e) {
throw new ServiceException("add job failed", e); logger.error("Failed to add scheduler task, projectId: {}, scheduler: {}", projectId, schedule, e);
throw new SchedulerException("Add schedule job failed", e);
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
@Override @Override
public String buildJobName(int processId) { public void deleteScheduleTask(int projectId, int scheduleId) throws SchedulerException {
return QUARTZ_JOB_PREFIX + UNDERLINE + processId; JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId, projectId);
} try {
if (scheduler.checkExists(jobKey)) {
@Override logger.info("Try to delete scheduler task, projectId: {}, schedulerId: {}", projectId, scheduleId);
public String buildJobGroupName(int projectId) { scheduler.deleteJob(jobKey);
return QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId; }
} catch (Exception e) {
logger.error("Failed to delete scheduler task, projectId: {}, schedulerId: {}", projectId, scheduleId, e);
throw new SchedulerException("Failed to delete scheduler task");
}
} }
@Override @Override
public Map<String, Object> buildDataMap(int projectId, Schedule schedule) { public void close() {
Map<String, Object> dataMap = new HashMap<>(8); // nothing to do
dataMap.put(PROJECT_ID, projectId); try {
dataMap.put(SCHEDULE_ID, schedule.getId()); scheduler.shutdown();
dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule)); } catch (org.quartz.SchedulerException e) {
throw new SchedulerException("Failed to shutdown scheduler", e);
return dataMap; }
} }
} }

32
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.scheduler.quartz;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QuartzSchedulerConfiguration {
@Bean
public SchedulerApi schedulerApi() {
return new QuartzScheduler();
}
}

59
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/utils/QuartzTaskUtils.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.scheduler.quartz.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import java.util.HashMap;
import java.util.Map;
import org.quartz.JobKey;
public final class QuartzTaskUtils {
public static final String QUARTZ_JOB_PREFIX = "job";
public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
public static final String UNDERLINE = "_";
public static final String PROJECT_ID = "projectId";
public static final String SCHEDULE_ID = "scheduleId";
public static final String SCHEDULE = "schedule";
/**
* @param schedulerId scheduler id
* @return quartz job name
*/
public static JobKey getJobKey(int schedulerId, int projectId) {
String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId;
String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId;
return new JobKey(jobName, jobGroup);
}
/**
* create quartz job data, include projectId and scheduleId, schedule.
*/
public static Map<String, Object> buildDataMap(int projectId, Schedule schedule) {
Map<String, Object> dataMap = new HashMap<>(8);
dataMap.put(PROJECT_ID, projectId);
dataMap.put(SCHEDULE_ID, schedule.getId());
dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
return dataMap;
}
}

35
dolphinscheduler-scheduler-plugin/pom.xml

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>dolphinscheduler-scheduler-plugin</artifactId>
<modules>
<module>dolphinscheduler-scheduler-api</module>
<module>dolphinscheduler-scheduler-quartz</module>
</modules>
</project>

14
dolphinscheduler-service/pom.xml

@ -52,14 +52,8 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>com.cronutils</groupId>
<artifactId>spring-boot-starter-quartz</artifactId> <artifactId>cron-utils</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.quartz-scheduler</groupId> <groupId>org.quartz-scheduler</groupId>
@ -79,10 +73,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron; package org.apache.dolphinscheduler.service.corn;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java

@ -15,14 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron; package org.apache.dolphinscheduler.service.corn;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.day; import static org.apache.dolphinscheduler.service.corn.CycleFactory.day;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.hour; import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.min; import static org.apache.dolphinscheduler.service.corn.CycleFactory.min;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.month; import static org.apache.dolphinscheduler.service.corn.CycleFactory.month;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.week; import static org.apache.dolphinscheduler.service.corn.CycleFactory.week;
import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.year; import static org.apache.dolphinscheduler.service.corn.CycleFactory.year;
import static com.cronutils.model.CronType.QUARTZ; import static com.cronutils.model.CronType.QUARTZ;
@ -51,6 +51,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser; import com.cronutils.parser.CronParser;
/** /**
* // todo: this utils is heavy, it rely on quartz and corn-utils.
* cron utils * cron utils
*/ */
public class CronUtils { public class CronUtils {

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron; package org.apache.dolphinscheduler.service.corn;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.cronutils.model.field.expression.Always; import com.cronutils.model.field.expression.Always;

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleLinks.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron; package org.apache.dolphinscheduler.service.corn;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -128,9 +128,9 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;

120
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java

@ -14,26 +14,38 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron;
import com.cronutils.builder.CronBuilder; package org.apache.dolphinscheduler.service.cron;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType; import static com.cronutils.model.field.expression.FieldExpressionFactory.always;
import com.cronutils.model.definition.CronDefinitionBuilder; import static com.cronutils.model.field.expression.FieldExpressionFactory.every;
import com.cronutils.model.field.CronField; import static com.cronutils.model.field.expression.FieldExpressionFactory.on;
import com.cronutils.model.field.CronFieldName; import static com.cronutils.model.field.expression.FieldExpressionFactory.questionMark;
import com.cronutils.model.field.expression.*;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import java.text.ParseException;
import java.util.Date;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.text.ParseException; import com.cronutils.builder.CronBuilder;
import java.util.Date; import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import static com.cronutils.model.field.expression.FieldExpressionFactory.*; import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.Always;
import com.cronutils.model.field.expression.And;
import com.cronutils.model.field.expression.Between;
import com.cronutils.model.field.expression.Every;
import com.cronutils.model.field.expression.On;
import com.cronutils.model.field.expression.QuestionMark;
/** /**
* CronUtilsTest * CronUtilsTest
@ -66,6 +78,7 @@ public class CronUtilsTest {
/** /**
* cron parse test * cron parse test
*
* @throws ParseException if error throws ParseException * @throws ParseException if error throws ParseException
*/ */
@Test @Test
@ -83,6 +96,7 @@ public class CronUtilsTest {
/** /**
* schedule type test * schedule type test
*
* @throws ParseException if error throws ParseException * @throws ParseException if error throws ParseException
*/ */
@Test @Test
@ -115,7 +129,7 @@ public class CronUtilsTest {
* test * test
*/ */
@Test @Test
public void test2(){ public void test2() {
Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(always()) .withYear(always())
.withDoW(questionMark()) .withDoW(questionMark())
@ -126,62 +140,62 @@ public class CronUtilsTest {
.withSecond(on(0)) .withSecond(on(0))
.instance(); .instance();
// minute cycle // minute cycle
String[] cronArayy = new String[]{"* * * * * ? *","* 0 * * * ? *", String[] cronArayy = new String[] {"* * * * * ? *", "* 0 * * * ? *",
"* 5 * * 3/5 ? *","0 0 * * * ? *", "0 0 7 * 1 ? *", "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *" , "0 0 7 * 1,2 ? *"}; "* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *", "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"};
for(String minCrontab:cronArayy){ for (String minCrontab : cronArayy) {
if (!org.quartz.CronExpression.isValidExpression(minCrontab)) { if (!org.quartz.CronExpression.isValidExpression(minCrontab)) {
throw new RuntimeException(minCrontab+" verify failure, cron expression not valid"); throw new RuntimeException(minCrontab + " verify failure, cron expression not valid");
} }
Cron cron = CronUtils.parse2Cron(minCrontab); Cron cron = CronUtils.parse2Cron(minCrontab);
CronField minField = cron.retrieve(CronFieldName.MINUTE); CronField minField = cron.retrieve(CronFieldName.MINUTE);
logger.info("minField instanceof Between:"+(minField.getExpression() instanceof Between)); logger.info("minField instanceof Between:" + (minField.getExpression() instanceof Between));
logger.info("minField instanceof Every:"+(minField.getExpression() instanceof Every)); logger.info("minField instanceof Every:" + (minField.getExpression() instanceof Every));
logger.info("minField instanceof Always:" + (minField.getExpression() instanceof Always)); logger.info("minField instanceof Always:" + (minField.getExpression() instanceof Always));
logger.info("minField instanceof On:"+(minField.getExpression() instanceof On)); logger.info("minField instanceof On:" + (minField.getExpression() instanceof On));
logger.info("minField instanceof And:"+(minField.getExpression() instanceof And)); logger.info("minField instanceof And:" + (minField.getExpression() instanceof And));
CronField hourField = cron.retrieve(CronFieldName.HOUR); CronField hourField = cron.retrieve(CronFieldName.HOUR);
logger.info("hourField instanceof Between:"+(hourField.getExpression() instanceof Between)); logger.info("hourField instanceof Between:" + (hourField.getExpression() instanceof Between));
logger.info("hourField instanceof Always:"+(hourField.getExpression() instanceof Always)); logger.info("hourField instanceof Always:" + (hourField.getExpression() instanceof Always));
logger.info("hourField instanceof Every:"+(hourField.getExpression() instanceof Every)); logger.info("hourField instanceof Every:" + (hourField.getExpression() instanceof Every));
logger.info("hourField instanceof On:"+(hourField.getExpression() instanceof On)); logger.info("hourField instanceof On:" + (hourField.getExpression() instanceof On));
logger.info("hourField instanceof And:"+(hourField.getExpression() instanceof And)); logger.info("hourField instanceof And:" + (hourField.getExpression() instanceof And));
CronField dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH); CronField dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
logger.info("dayOfMonthField instanceof Between:"+(dayOfMonthField.getExpression() instanceof Between)); logger.info("dayOfMonthField instanceof Between:" + (dayOfMonthField.getExpression() instanceof Between));
logger.info("dayOfMonthField instanceof Always:"+(dayOfMonthField.getExpression() instanceof Always)); logger.info("dayOfMonthField instanceof Always:" + (dayOfMonthField.getExpression() instanceof Always));
logger.info("dayOfMonthField instanceof Every:"+(dayOfMonthField.getExpression() instanceof Every)); logger.info("dayOfMonthField instanceof Every:" + (dayOfMonthField.getExpression() instanceof Every));
logger.info("dayOfMonthField instanceof On:"+(dayOfMonthField.getExpression() instanceof On)); logger.info("dayOfMonthField instanceof On:" + (dayOfMonthField.getExpression() instanceof On));
logger.info("dayOfMonthField instanceof And:"+(dayOfMonthField.getExpression() instanceof And)); logger.info("dayOfMonthField instanceof And:" + (dayOfMonthField.getExpression() instanceof And));
logger.info("dayOfMonthField instanceof QuestionMark:"+(dayOfMonthField.getExpression() instanceof QuestionMark)); logger.info("dayOfMonthField instanceof QuestionMark:" + (dayOfMonthField.getExpression() instanceof QuestionMark));
CronField monthField = cron.retrieve(CronFieldName.MONTH); CronField monthField = cron.retrieve(CronFieldName.MONTH);
logger.info("monthField instanceof Between:"+(monthField.getExpression() instanceof Between)); logger.info("monthField instanceof Between:" + (monthField.getExpression() instanceof Between));
logger.info("monthField instanceof Always:"+(monthField.getExpression() instanceof Always)); logger.info("monthField instanceof Always:" + (monthField.getExpression() instanceof Always));
logger.info("monthField instanceof Every:"+(monthField.getExpression() instanceof Every)); logger.info("monthField instanceof Every:" + (monthField.getExpression() instanceof Every));
logger.info("monthField instanceof On:"+(monthField.getExpression() instanceof On)); logger.info("monthField instanceof On:" + (monthField.getExpression() instanceof On));
logger.info("monthField instanceof And:"+(monthField.getExpression() instanceof And)); logger.info("monthField instanceof And:" + (monthField.getExpression() instanceof And));
logger.info("monthField instanceof QuestionMark:"+(monthField.getExpression() instanceof QuestionMark)); logger.info("monthField instanceof QuestionMark:" + (monthField.getExpression() instanceof QuestionMark));
CronField dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK); CronField dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
logger.info("dayOfWeekField instanceof Between:"+(dayOfWeekField.getExpression() instanceof Between)); logger.info("dayOfWeekField instanceof Between:" + (dayOfWeekField.getExpression() instanceof Between));
logger.info("dayOfWeekField instanceof Always:"+(dayOfWeekField.getExpression() instanceof Always)); logger.info("dayOfWeekField instanceof Always:" + (dayOfWeekField.getExpression() instanceof Always));
logger.info("dayOfWeekField instanceof Every:"+(dayOfWeekField.getExpression() instanceof Every)); logger.info("dayOfWeekField instanceof Every:" + (dayOfWeekField.getExpression() instanceof Every));
logger.info("dayOfWeekField instanceof On:"+(dayOfWeekField.getExpression() instanceof On)); logger.info("dayOfWeekField instanceof On:" + (dayOfWeekField.getExpression() instanceof On));
logger.info("dayOfWeekField instanceof And:"+(dayOfWeekField.getExpression() instanceof And)); logger.info("dayOfWeekField instanceof And:" + (dayOfWeekField.getExpression() instanceof And));
logger.info("dayOfWeekField instanceof QuestionMark:"+(dayOfWeekField.getExpression() instanceof QuestionMark)); logger.info("dayOfWeekField instanceof QuestionMark:" + (dayOfWeekField.getExpression() instanceof QuestionMark));
CronField yearField = cron.retrieve(CronFieldName.YEAR); CronField yearField = cron.retrieve(CronFieldName.YEAR);
logger.info("yearField instanceof Between:"+(yearField.getExpression() instanceof Between)); logger.info("yearField instanceof Between:" + (yearField.getExpression() instanceof Between));
logger.info("yearField instanceof Always:"+(yearField.getExpression() instanceof Always)); logger.info("yearField instanceof Always:" + (yearField.getExpression() instanceof Always));
logger.info("yearField instanceof Every:"+(yearField.getExpression() instanceof Every)); logger.info("yearField instanceof Every:" + (yearField.getExpression() instanceof Every));
logger.info("yearField instanceof On:"+(yearField.getExpression() instanceof On)); logger.info("yearField instanceof On:" + (yearField.getExpression() instanceof On));
logger.info("yearField instanceof And:"+(yearField.getExpression() instanceof And)); logger.info("yearField instanceof And:" + (yearField.getExpression() instanceof And));
logger.info("yearField instanceof QuestionMark:"+(yearField.getExpression() instanceof QuestionMark)); logger.info("yearField instanceof QuestionMark:" + (yearField.getExpression() instanceof QuestionMark));
CycleEnum cycleEnum = CronUtils.getMaxCycle(minCrontab); CycleEnum cycleEnum = CronUtils.getMaxCycle(minCrontab);
if(cycleEnum !=null){ if (cycleEnum != null) {
logger.info(cycleEnum.name()); logger.info(cycleEnum.name());
}else{ } else {
logger.info("can't get scheduleType"); logger.info("can't get scheduleType");
} }
} }
@ -213,7 +227,7 @@ public class CronUtilsTest {
} }
@Test @Test
public void getExpirationTime(){ public void getExpirationTime() {
Date startTime = DateUtils.stringToDate("2020-02-07 18:30:00"); Date startTime = DateUtils.stringToDate("2020-02-07 18:30:00");
Date expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.HOUR); Date expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.HOUR);
Assert.assertEquals("2020-02-07 19:30:00", DateUtils.dateToString(expirationTime)); Assert.assertEquals("2020-02-07 19:30:00", DateUtils.dateToString(expirationTime));

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -72,7 +72,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.spi.params.base.FormType; import org.apache.dolphinscheduler.spi.params.base.FormType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -63,8 +63,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
excludeFilters = { excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = { @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.service.process.*", "org.apache.dolphinscheduler.service.process.*",
// todo: split the quartz into a single module
"org.apache.dolphinscheduler.service.quartz.*",
"org.apache.dolphinscheduler.service.queue.*", "org.apache.dolphinscheduler.service.queue.*",
}) })
} }

12
pom.xml

@ -386,6 +386,17 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-scheduler-quartz</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId> <artifactId>dolphinscheduler-datasource-all</artifactId>
@ -1254,5 +1265,6 @@
<module>dolphinscheduler-log-server</module> <module>dolphinscheduler-log-server</module>
<module>dolphinscheduler-tools</module> <module>dolphinscheduler-tools</module>
<module>dolphinscheduler-ui</module> <module>dolphinscheduler-ui</module>
<module>dolphinscheduler-scheduler-plugin</module>
</modules> </modules>
</project> </project>

Loading…
Cancel
Save