From 5d5125d50760ac7846d78b12c270143bbbe5a037 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Fri, 29 Apr 2022 10:17:34 +0800 Subject: [PATCH] [Bug-9829][Api] Fix schedule timezone (#9830) --- .../api/service/impl/ExecutorServiceImpl.java | 33 +- .../service/impl/SchedulerServiceImpl.java | 24 +- .../dolphinscheduler/api/vo/ScheduleVo.java | 337 ++++++++++++++++++ .../common/utils/DateUtils.java | 21 +- .../common/utils/DateUtilsTest.java | 9 +- .../common/utils/JSONUtilsTest.java | 3 + .../quartz/impl/QuartzExecutorImpl.java | 56 +-- 7 files changed, 439 insertions(+), 44 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/ScheduleVo.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index fe562ef41d..5f72be37ed 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -44,8 +44,23 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; @@ -58,7 +73,13 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -223,7 +244,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { // check process definition online putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version); - } else if (!checkSubProcessDefinitionValid(processDefinition)){ + } else if (!checkSubProcessDefinitionValid(processDefinition)) { // check sub process definition online putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE); } else { @@ -241,7 +262,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) { // query all subprocesses under the current process List processTaskRelations = processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode()); - if (processTaskRelations.isEmpty()){ + if (processTaskRelations.isEmpty()) { return true; } Set relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet()); @@ -252,7 +273,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ taskDefinitions.stream() .filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())) .forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); - if (processDefinitionCodeSet.isEmpty()){ + if (processDefinitionCodeSet.isEmpty()) { return true; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 52b22577c3..0909d3e582 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.api.vo.ScheduleVo; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; @@ -47,7 +48,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutor; -import org.apache.dolphinscheduler.service.quartz.impl.QuartzExecutorImpl; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.commons.lang.StringUtils; @@ -173,6 +173,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR); return result; } + scheduleObj.setStartTime(scheduleParam.getStartTime()); scheduleObj.setEndTime(scheduleParam.getEndTime()); if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) { @@ -414,9 +415,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe IPage scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal); - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + List scheduleList = new ArrayList<>(); + for (Schedule schedule : scheduleIPage.getRecords()) { + scheduleList.add(new ScheduleVo(schedule)); + } + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); pageInfo.setTotal((int) scheduleIPage.getTotal()); - pageInfo.setTotalList(scheduleIPage.getRecords()); + pageInfo.setTotalList(scheduleList); result.setData(pageInfo); putMsg(result, Status.SUCCESS); return result; @@ -441,8 +447,12 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } List schedules = scheduleMapper.querySchedulerListByProjectName(project.getName()); + List scheduleList = new ArrayList<>(); + for (Schedule schedule : schedules) { + scheduleList.add(new ScheduleVo(schedule)); + } - result.put(Constants.DATA_LIST, schedules); + result.put(Constants.DATA_LIST, scheduleList); putMsg(result, Status.SUCCESS); return result; @@ -561,8 +571,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); Date now = new Date(); - Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime(); - Date endTime = scheduleParam.getEndTime(); + Date startTime = DateUtils.transformTimezoneDate(scheduleParam.getStartTime(), scheduleParam.getTimezoneId()); + Date endTime = DateUtils.transformTimezoneDate(scheduleParam.getEndTime(), scheduleParam.getTimezoneId()); + startTime = now.after(startTime) ? now : startTime; + try { cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab()); } catch (ParseException e) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/ScheduleVo.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/ScheduleVo.java new file mode 100644 index 0000000000..0137c76ac2 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/ScheduleVo.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.vo; + +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.Schedule; + +import java.time.ZoneId; +import java.util.Date; + +public class ScheduleVo { + + private int id; + + /** + * process definition code + */ + private long processDefinitionCode; + + /** + * process definition name + */ + private String processDefinitionName; + + /** + * project name + */ + private String projectName; + + /** + * schedule description + */ + private String definitionDescription; + + /** + * schedule start time + */ + private String startTime; + + /** + * schedule end time + */ + private String endTime; + + /** + * timezoneId + *

see {@link java.util.TimeZone#getTimeZone(String)} + */ + private String timezoneId; + + /** + * crontab expression + */ + private String crontab; + + /** + * failure strategy + */ + private FailureStrategy failureStrategy; + + /** + * warning type + */ + private WarningType warningType; + + /** + * create time + */ + private Date createTime; + + /** + * update time + */ + private Date updateTime; + + /** + * created user id + */ + private int userId; + + /** + * created user name + */ + private String userName; + + /** + * release state + */ + private ReleaseState releaseState; + + /** + * warning group id + */ + private int warningGroupId; + + + /** + * process instance priority + */ + private Priority processInstancePriority; + + /** + * worker group + */ + private String workerGroup; + + /** + * environment code + */ + private Long environmentCode; + + public ScheduleVo(Schedule schedule) { + this.setId(schedule.getId()); + this.setCrontab(schedule.getCrontab()); + this.setProjectName(schedule.getProjectName()); + this.setUserName(schedule.getUserName()); + this.setWorkerGroup(schedule.getWorkerGroup()); + this.setWarningType(schedule.getWarningType()); + this.setWarningGroupId(schedule.getWarningGroupId()); + this.setUserId(schedule.getUserId()); + this.setUpdateTime(schedule.getUpdateTime()); + this.setTimezoneId(schedule.getTimezoneId()); + this.setReleaseState(schedule.getReleaseState()); + this.setProcessInstancePriority(schedule.getProcessInstancePriority()); + this.setProcessDefinitionName(schedule.getProcessDefinitionName()); + this.setProcessDefinitionCode(schedule.getProcessDefinitionCode()); + this.setFailureStrategy(schedule.getFailureStrategy()); + this.setEnvironmentCode(schedule.getEnvironmentCode()); + this.setStartTime(DateUtils.dateToString(schedule.getStartTime(), ZoneId.systemDefault().getId())); + this.setEndTime(DateUtils.dateToString(schedule.getEndTime(), ZoneId.systemDefault().getId())); + } + + public int getWarningGroupId() { + return warningGroupId; + } + + public void setWarningGroupId(int warningGroupId) { + this.warningGroupId = warningGroupId; + } + + public String getProjectName() { + return projectName; + } + + public void setProjectName(String projectName) { + this.projectName = projectName; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public String getTimezoneId() { + return timezoneId; + } + + public void setTimezoneId(String timezoneId) { + this.timezoneId = timezoneId; + } + + public String getCrontab() { + return crontab; + } + + public void setCrontab(String crontab) { + this.crontab = crontab; + } + + public FailureStrategy getFailureStrategy() { + return failureStrategy; + } + + public void setFailureStrategy(FailureStrategy failureStrategy) { + this.failureStrategy = failureStrategy; + } + + public WarningType getWarningType() { + return warningType; + } + + public void setWarningType(WarningType warningType) { + this.warningType = warningType; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public ReleaseState getReleaseState() { + return releaseState; + } + + public void setReleaseState(ReleaseState releaseState) { + this.releaseState = releaseState; + } + + public long getProcessDefinitionCode() { + return processDefinitionCode; + } + + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; + } + + public String getProcessDefinitionName() { + return processDefinitionName; + } + + public void setProcessDefinitionName(String processDefinitionName) { + this.processDefinitionName = processDefinitionName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public Priority getProcessInstancePriority() { + return processInstancePriority; + } + + public void setProcessInstancePriority(Priority processInstancePriority) { + this.processInstancePriority = processInstancePriority; + } + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public Long getEnvironmentCode() { + return this.environmentCode; + } + + public void setEnvironmentCode(Long environmentCode) { + this.environmentCode = environmentCode; + } + + @Override + public String toString() { + return "Schedule{" + + "id=" + id + + ", processDefinitionCode=" + processDefinitionCode + + ", processDefinitionName='" + processDefinitionName + '\'' + + ", projectName='" + projectName + '\'' + + ", description='" + definitionDescription + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", timezoneId='" + timezoneId + +'\'' + + ", crontab='" + crontab + '\'' + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + ", userId=" + userId + + ", userName='" + userName + '\'' + + ", releaseState=" + releaseState + + ", warningGroupId=" + warningGroupId + + ", processInstancePriority=" + processInstancePriority + + ", workerGroup='" + workerGroup + '\'' + + ", environmentCode='" + environmentCode + '\'' + + '}'; + } + + public String getDefinitionDescription() { + return definitionDescription; + } + + public void setDefinitionDescription(String definitionDescription) { + this.definitionDescription = definitionDescription; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 3cc7d5f340..6ee9ac99d7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -536,17 +536,24 @@ public final class DateUtils { /** * transform date to target timezone date + * sourceTimeZoneId is system default timezone + */ + public static Date transformTimezoneDate(Date date, String targetTimezoneId) { + return transformTimezoneDate(date, ZoneId.systemDefault().getId(), targetTimezoneId); + } + + /** + * transform date from source timezone date to target timezone date *

e.g. - *

if input date is 2020-01-01 00:00:00 current timezone is CST - *

targetTimezoneId is MST - *

this method will return 2020-01-01 15:00:00 + *

if input date is `Thu Apr 28 10:00:00 UTC 2022`, sourceTimezoneId is UTC + *

targetTimezoneId is Asia/Shanghai + *

this method will return `Thu Apr 28 02:00:00 UTC 2022` */ - public static Date getTimezoneDate(Date date, String targetTimezoneId) { - if (StringUtils.isEmpty(targetTimezoneId)) { + public static Date transformTimezoneDate(Date date, String sourceTimezoneId, String targetTimezoneId) { + if (StringUtils.isEmpty(sourceTimezoneId) || StringUtils.isEmpty(targetTimezoneId)) { return date; } - - String dateToString = dateToString(date); + String dateToString = dateToString(date, sourceTimezoneId); LocalDateTime localDateTime = LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS)); ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId()); return Date.from(zonedDateTime.toInstant()); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java index e61f581155..96c4450923 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java @@ -214,9 +214,14 @@ public class DateUtilsTest { @Test public void testTransformToTimezone() { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + Date date = new Date(); - Date mst = DateUtils.getTimezoneDate(date, TimeZone.getDefault().getID()); - Assert.assertEquals(DateUtils.dateToString(date), DateUtils.dateToString(mst)); + Date defaultTimeZoneDate = DateUtils.transformTimezoneDate(date, TimeZone.getDefault().getID()); + Assert.assertEquals(DateUtils.dateToString(date), DateUtils.dateToString(defaultTimeZoneDate)); + + Date targetTimeZoneDate = DateUtils.transformTimezoneDate(date, TimeZone.getDefault().getID(), "Asia/Shanghai"); + Assert.assertEquals(DateUtils.dateToString(date, TimeZone.getDefault().getID()), DateUtils.dateToString(targetTimeZoneDate, "Asia/Shanghai")); } @Test diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java index b32640c189..2d638c943e 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; import org.junit.Assert; import org.junit.Test; @@ -261,6 +262,7 @@ public class JSONUtilsTest { @Test public void dateToString() { + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); String time = "2022-02-22 13:38:24"; Date date = DateUtils.stringToDate(time); String json = JSONUtils.toJsonString(date); @@ -272,6 +274,7 @@ public class JSONUtilsTest { @Test public void stringToDate() { + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); String json = "\"2022-02-22 13:38:24\""; Date date = JSONUtils.parseObject(json, Date.class); Assert.assertEquals(date, DateUtils.stringToDate("2022-02-22 13:38:24")); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java index cd5f7803df..b4d5ee9246 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/impl/QuartzExecutorImpl.java @@ -17,12 +17,31 @@ package org.apache.dolphinscheduler.service.quartz.impl; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; + +import static org.quartz.CronScheduleBuilder.cronSchedule; +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.quartz.QuartzExecutor; + +import org.apache.commons.lang.StringUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.quartz.CronTrigger; import org.quartz.Job; import org.quartz.JobDetail; @@ -34,22 +53,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID; -import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PREFIX; -import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PREFIX; -import static org.apache.dolphinscheduler.common.Constants.SCHEDULE; -import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.quartz.CronScheduleBuilder.cronSchedule; -import static org.quartz.JobBuilder.newJob; -import static org.quartz.TriggerBuilder.newTrigger; - @Service public class QuartzExecutorImpl implements QuartzExecutor { private static final Logger logger = LoggerFactory.getLogger(QuartzExecutorImpl.class); @@ -70,12 +73,21 @@ public class QuartzExecutorImpl implements QuartzExecutor { public void addJob(Class clazz, int projectId, final Schedule schedule) { String jobName = this.buildJobName(schedule.getId()); String jobGroupName = this.buildJobGroupName(projectId); - Date startDate = schedule.getStartTime(); - Date endDate = schedule.getEndTime(); + Map jobDataMap = this.buildDataMap(projectId, schedule); String cronExpression = schedule.getCrontab(); String timezoneId = schedule.getTimezoneId(); + /** + * transform from server default timezone to schedule timezone + * e.g. server default timezone is `UTC` + * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`, + * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours, + * so when add job to quartz, it should recover by transform timezone + */ + Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId); + Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId); + lock.writeLock().lock(); try { @@ -107,8 +119,8 @@ public class QuartzExecutorImpl implements QuartzExecutor { */ CronTrigger cronTrigger = newTrigger() .withIdentity(triggerKey) - .startAt(DateUtils.getTimezoneDate(startDate, timezoneId)) - .endAt(DateUtils.getTimezoneDate(endDate, timezoneId)) + .startAt(startDate) + .endAt(endDate) .withSchedule( cronSchedule(cronExpression) .withMisfireHandlingInstructionDoNothing() @@ -140,13 +152,11 @@ public class QuartzExecutorImpl implements QuartzExecutor { } } - @Override public String buildJobName(int processId) { return QUARTZ_JOB_PREFIX + UNDERLINE + processId; } - @Override public String buildJobGroupName(int projectId) { return QUARTZ_JOB_GROUP_PREFIX + UNDERLINE + projectId;