From 88690462b4b93f8df01ba1952374cdf00c02eda5 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 11 Oct 2021 20:03:25 +0800 Subject: [PATCH] [Fix-6478] [Server] Fix the lack of scheduling time in complement parallelism mode (#6491) * fix missing history data * update code style * update ExecutorServiceTest test class. --- .../api/service/impl/ExecutorServiceImpl.java | 6 ++-- .../api/service/ExecutorServiceTest.java | 31 +++++++++++++++++++ .../service/quartz/cron/CronUtils.java | 1 + 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index b910c4ba01..5042a03714 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -604,15 +604,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ createCount = Math.min(listDate.size(), expectedParallelismNumber); } logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + + listDate.addLast(end); int chunkSize = listDate.size() / createCount; for (int i = 0; i < createCount; i++) { int rangeStart = i == 0 ? i : (i * chunkSize); int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize; - if (rangeEnd == listDate.size()) { - rangeEnd = listDate.size() - 1; - } + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index e308f58443..9766c61dff 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -58,12 +58,15 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * executor service 2 test */ @RunWith(MockitoJUnitRunner.Silent.class) public class ExecutorServiceTest { + private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class); @InjectMocks private ExecutorServiceImpl executorService; @@ -326,4 +329,32 @@ public class ExecutorServiceTest { result.put(Constants.STATUS, Status.SUCCESS); return result; } + + @Test + public void testCreateComplementToParallel() { + List result = new ArrayList<>(); + int expectedParallelismNumber = 3; + LinkedList listDate = new LinkedList<>(); + listDate.add(0); + listDate.add(1); + listDate.add(2); + listDate.add(3); + + int createCount = Math.min(listDate.size(), expectedParallelismNumber); + logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + + listDate.addLast(4); + int chunkSize = listDate.size() / createCount; + for (int i = 0; i < createCount; i++) { + int rangeStart = i == 0 ? i : (i * chunkSize); + int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize; + logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd); + result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd)); + } + + Assert.assertEquals("0,1", result.get(0)); + Assert.assertEquals("1,2", result.get(1)); + Assert.assertEquals("2,4", result.get(2)); + + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java index f23da2620f..f195d62f0f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java @@ -190,6 +190,7 @@ public class CronUtils { return result; } + // support left closed and right open time interval (startDate <= N < endDate) Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS);