Browse Source

[Fix-13292] [Api] When serial complement is a process configured with timing scheduling, an error will occur when selecting a time that is not within the timing range (#13356)

* [Fix-13292] [Api] When serial complement is a process configured with timing scheduling, an error will occur when selecting a time that is not within the timing range

* [Fix-13292] [Api] When serial complement is a process configured with timing scheduling, an error will occur when selecting a time that is not within the timing range
2.0.8-release
Molin Wang 2 years ago committed by GitHub
parent
commit
a7ddc9047b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -295,6 +295,8 @@ public enum Status {
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"), NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"), NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"),
COMPLEMENT_PROCESS_INSTANCE_DATE_RANGE_ERROR(50059, "complement instances cannot be generated within the complement time range according to the schedule configuration",
"根据调度配置在补数时间范围内无法生成补数实例"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/** /**

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -23,6 +23,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
@ -144,17 +145,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/** /**
* create command * create command
*/ */
int create = this.createCommand(commandType, processDefinition.getCode(), try {
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), int create = this.createCommand(commandType, processDefinition.getCode(),
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun); taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId); if (create > 0) {
processDefinitionMapper.updateById(processDefinition); processDefinition.setWarningGroupId(warningGroupId);
putMsg(result, Status.SUCCESS); processDefinitionMapper.updateById(processDefinition);
} else { putMsg(result, Status.SUCCESS);
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR); } else {
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
} catch (ServiceException e) {
Optional<Status> status = Status.findStatusBy(e.getCode());
putMsg(result, status.orElse(Status.START_PROCESS_INSTANCE_ERROR));
} }
return result; return result;
} }
@ -569,6 +576,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
int createCount = 0; int createCount = 0;
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
LinkedList<Date> listDate = new LinkedList<>(CronUtils.getSelfFireDateList(start, end, schedules));
final int listDateSize = listDate.size();
if (listDateSize == 0) {
logger.warn("can't create complement command, because the fire date cannot be created, scope: {} ~ {}",
DateUtils.dateToString(start), DateUtils.dateToString(end));
throw new ServiceException(Status.COMPLEMENT_PROCESS_INSTANCE_DATE_RANGE_ERROR);
}
switch (runMode) { switch (runMode) {
case RUN_MODE_SERIAL: { case RUN_MODE_SERIAL: {
if (start.after(end)) { if (start.after(end)) {
@ -587,10 +602,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break; break;
} }
LinkedList<Date> listDate = new LinkedList<>();
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
int listDateSize = listDate.size();
createCount = listDate.size(); createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) { if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java

@ -187,10 +187,6 @@ public class CronUtils {
*/ */
public static List<Date> getSelfFireDateList(final Date startTime, final Date endTime, final List<Schedule> schedules) { public static List<Date> getSelfFireDateList(final Date startTime, final Date endTime, final List<Schedule> schedules) {
List<Date> result = new ArrayList<>(); List<Date> result = new ArrayList<>();
if (startTime.equals(endTime)) {
result.add(startTime);
return result;
}
// support left closed and right closed time interval (startDate <= N <= endDate) // support left closed and right closed time interval (startDate <= N <= endDate)
Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS);

Loading…
Cancel
Save