diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index f3f7ed2be6..5991e79f0a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -621,6 +621,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Map cmdParam = JSONUtils.toMap(command.getCommandParam()); switch (runMode) { case RUN_MODE_SERIAL: { + if (start.after(end)) { + logger.warn("The startDate {} is later than the endDate {}", start, end); + break; + } cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); @@ -628,26 +632,45 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ break; } case RUN_MODE_PARALLEL: { + if (start.after(end)) { + logger.warn("The startDate {} is later than the endDate {}", start, end); + break; + } + LinkedList listDate = new LinkedList<>(); List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); + int listDateSize = listDate.size(); createCount = listDate.size(); if (!CollectionUtils.isEmpty(listDate)) { if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { createCount = Math.min(listDate.size(), expectedParallelismNumber); + if (listDateSize < createCount) { + createCount = listDateSize; + } } logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); - listDate.addLast(end); - int chunkSize = listDate.size() / createCount; - - for (int i = 0; i < createCount; i++) { - int rangeStart = i == 0 ? i : (i * chunkSize); - int rangeEnd = i == createCount - 1 ? listDate.size() - 1 - : rangeStart + chunkSize; - - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); + // Distribute the number of tasks equally to each command. + // The last command with insufficient quantity will be assigned to the remaining tasks. + int itemsPerCommand = (listDateSize / createCount); + int remainingItems = (listDateSize % createCount); + int startDateIndex = 0; + int endDateIndex = 0; + + for (int i = 1; i <= createCount; i++) { + int extra = (i <= remainingItems) ? 1 : 0; + int singleCommandItems = (itemsPerCommand + extra); + + if (i == 1) { + endDateIndex += singleCommandItems - 1; + } else { + startDateIndex = endDateIndex + 1; + endDateIndex += singleCommandItems; + } + + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex))); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); processService.createCommand(command); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index da942fbedf..90fb173017 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -32,6 +34,8 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -340,22 +344,35 @@ public class ExecutorServiceTest { listDate.add(1); listDate.add(2); listDate.add(3); + listDate.add(4); + int listDateSize = listDate.size(); int createCount = Math.min(listDate.size(), expectedParallelismNumber); logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); - listDate.addLast(4); - int chunkSize = listDate.size() / createCount; - for (int i = 0; i < createCount; i++) { - int rangeStart = i == 0 ? i : (i * chunkSize); - int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize; - logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd); - result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd)); + int itemsPerCommand = (listDateSize / createCount); + int remainingItems = (listDateSize % createCount); + int startDateIndex = 0; + int endDateIndex = 0; + + for (int i = 1; i <= createCount; i++) { + int extra = (i <= remainingItems) ? 1 : 0; + int singleCommandItems = (itemsPerCommand + extra); + + if (i == 1) { + endDateIndex += singleCommandItems - 1; + } else { + startDateIndex = endDateIndex + 1; + endDateIndex += singleCommandItems; + } + + logger.info("startDate:{}, endDate:{}", listDate.get(startDateIndex), listDate.get(endDateIndex)); + result.add(listDate.get(startDateIndex) + "," + listDate.get(endDateIndex)); } Assert.assertEquals("0,1", result.get(0)); - Assert.assertEquals("1,2", result.get(1)); - Assert.assertEquals("2,4", result.get(2)); - + Assert.assertEquals("2,3", result.get(1)); + Assert.assertEquals("4,4", result.get(2)); } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java index 3e7007a02a..d7847229ed 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java @@ -191,9 +191,9 @@ public class CronUtils { return result; } - // support left closed and right open time interval (startDate <= N < endDate) + // support left closed and right closed time interval (startDate <= N <= endDate) Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); - Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS); + Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS); List listSchedule = new ArrayList<>(); listSchedule.addAll(schedules);