diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index eb0f39e52d..a213573f8a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -551,66 +551,78 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (interval.length == 2) { start = DateUtils.getScheduleDate(interval[0]); end = DateUtils.getScheduleDate(interval[1]); + if (start.after(end)) { + logger.info("complement data error, wrong date start:{} and end date:{} ", + start, end + ); + return 0; + } } } // determine whether to complement if (commandType == CommandType.COMPLEMENT_DATA) { - runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; - if (null != start && null != end && !start.after(end)) { - if (runMode == RunMode.RUN_MODE_SERIAL) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - return processService.createCommand(command); - } else if (runMode == RunMode.RUN_MODE_PARALLEL) { - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineCode); - LinkedList listDate = new LinkedList<>(); - if (!CollectionUtils.isEmpty(schedules)) { - for (Schedule item : schedules) { - listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab())); - } - } - if (!CollectionUtils.isEmpty(listDate)) { - int effectThreadsCount = expectedParallelismNumber == null ? listDate.size() : Math.min(listDate.size(), expectedParallelismNumber); - logger.info("In parallel mode, current expectedParallelismNumber:{}", effectThreadsCount); - - int chunkSize = listDate.size() / effectThreadsCount; - listDate.addFirst(start); - listDate.addLast(end); - - for (int i = 0; i < effectThreadsCount; i++) { - int rangeStart = i == 0 ? i : (i * chunkSize); - int rangeEnd = i == effectThreadsCount - 1 ? listDate.size() - 1 - : rangeStart + chunkSize + 1; - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); - } - - return effectThreadsCount; - } else { - // loop by day - int runCunt = 0; - while (!start.after(end)) { - runCunt += 1; - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); - start = DateUtils.getSomeDay(start, 1); - } - return runCunt; - } - } - } else { - logger.error("there is not valid schedule date for the process definition code:{}", processDefineCode); + if (start == null || end == null) { + return 0; } + return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber); } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); } + } - return 0; + /** + * create complement command + * close left open right + * + * @param start + * @param end + * @param runMode + * @return + */ + private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, Integer expectedParallelismNumber) { + int createCount = 0; + runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; + Map cmdParam = JSONUtils.toMap(command.getCommandParam()); + switch (runMode) { + case RUN_MODE_SERIAL: { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + createCount = processService.createCommand(command); + break; + } + case RUN_MODE_PARALLEL: { + LinkedList listDate = new LinkedList<>(); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); + createCount = listDate.size(); + if (!CollectionUtils.isEmpty(listDate)) { + if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { + createCount = Math.min(listDate.size(), expectedParallelismNumber); + } + logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); + int chunkSize = listDate.size() / createCount; + + for (int i = 0; i < createCount; i++) { + int rangeStart = i == 0 ? i : (i * chunkSize); + int rangeEnd = i == createCount - 1 ? listDate.size() - 1 + : rangeStart + chunkSize; + if (rangeEnd == listDate.size()) { + rangeEnd = listDate.size() - 1; + } + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); + } + } + break; + } + default: + break; + } + logger.info("create complement command count: {}", createCount); + return createCount; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 4f9aca5df7..417ce59de9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -449,6 +449,11 @@ public final class Constants { */ public static final String CMDPARAM_COMPLEMENT_DATA_END_DATE = "complementEndDate"; + /** + * complement date default cron string + */ + public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *"; + /** * data source config @@ -503,6 +508,11 @@ public final class Constants { */ public static final int SLEEP_TIME_MILLIS = 1000; + /** + * one second mils + */ + public static final int SECOND_TIME_MILLIS = 1000; + /** * master task instance cache-database refresh interval */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 2e151cb460..9de1a0b0ce 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -436,11 +436,15 @@ public class WorkflowExecuteThread implements Runnable { scheduleDate = complementListDate.get(0); } else if (processInstance.getState().typeIsFinished()) { endProcess(); + if (complementListDate.size() <= 0) { + logger.info("process complement end. process id:{}", processInstance.getId()); + return true; + } int index = complementListDate.indexOf(scheduleDate); if (index >= complementListDate.size() - 1 || !processInstance.getState().typeIsSuccess()) { logger.info("process complement end. process id:{}", processInstance.getId()); // complement data ends || no success - return false; + return true; } logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", processInstance.getId(), @@ -559,14 +563,19 @@ public class WorkflowExecuteThread implements Runnable { } } - if (complementListDate.size() == 0 && needComplementProcess()) { - complementListDate = processService.getComplementDateList( - JSONUtils.toMap(processInstance.getCommandParam()), - processInstance.getProcessDefinitionCode()); - logger.info(" process definition code:{} complement data: {}", - processInstance.getProcessDefinitionCode(), complementListDate.toString()); + if (processInstance.isComplementData() && complementListDate.size() == 0) { + Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); + if (complementListDate.size() == 0 && needComplementProcess()) { + complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); + logger.info(" process definition code:{} complement data: {}", + processInstance.getProcessDefinitionCode(), complementListDate.toString()); + } + } } - } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c1e392b758..c4fce3e402 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -587,7 +587,12 @@ public class ProcessService { if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { - List complementDateList = getComplementDateList(cmdParam, command.getProcessDefinitionCode()); + + Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + List schedules = queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + List complementDateList = CronUtils.getSelfFireDateList(start, end, schedules); + if (complementDateList.size() > 0) { scheduleTime = complementDateList.get(0); } else { @@ -972,7 +977,10 @@ public class ProcessService { return; } - List complementDate = getComplementDateList(cmdParam, processInstance.getProcessDefinitionCode()); + Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + List listSchedules = queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); + List complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules); if (complementDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { @@ -982,28 +990,6 @@ public class ProcessService { processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); - - } - - /** - * return complement date list - * - * @param cmdParam - * @param processDefinitionCode - * @return - */ - public List getComplementDateList(Map cmdParam, Long processDefinitionCode) { - List result = new ArrayList<>(); - Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); - Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); - if (startDate.after(endDate)) { - Date tmp = startDate; - startDate = endDate; - endDate = tmp; - } - List schedules = queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); - result.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules)); - return result; } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java index 418d063d4e..ab9a97b7ca 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java @@ -25,6 +25,7 @@ import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.week; import static com.cronutils.model.CronType.QUARTZ; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -182,18 +183,20 @@ public class CronUtils { * gets all scheduled times for a period of time based on self dependency * if schedulers is empty then default scheduler = 1 day */ - public static List getSelfFireDateList(Date startTime, Date endTime, List schedules) { + public static List getSelfFireDateList(final Date startTime, final Date endTime, final List schedules) { List result = new ArrayList<>(); - if (!CollectionUtils.isEmpty(schedules)) { - for (Schedule schedule : schedules) { - result.addAll(CronUtils.getSelfFireDateList(startTime, endTime, schedule.getCrontab())); - } - } else { - Date start = startTime; - for (int i = 0; start.before(endTime); i++) { - start = DateUtils.getSomeDay(startTime, i); - result.add(start); - } + Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); + Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS); + + List listSchedule = new ArrayList<>(); + listSchedule.addAll(schedules); + if (CollectionUtils.isEmpty(listSchedule)) { + Schedule schedule = new Schedule(); + schedule.setCrontab(Constants.DEFAULT_CRON_STRING); + listSchedule.add(schedule); + } + for (Schedule schedule : listSchedule) { + result.addAll(CronUtils.getSelfFireDateList(from, to, schedule.getCrontab())); } return result; } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java index b4f864c5b4..55cc19d338 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtilsTest.java @@ -167,7 +167,7 @@ public class CronUtilsTest { } @Test - public void getSelfFireDateList() throws ParseException{ + public void getSelfFireDateList() throws ParseException { Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); Date to = DateUtils.stringToDate("2020-01-31 00:00:00"); // test date @@ -179,6 +179,15 @@ public class CronUtilsTest { // test other Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size()); Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size()); + from = DateUtils.stringToDate("2020-01-01 00:02:00"); + to = DateUtils.stringToDate("2020-01-01 00:02:00"); + Assert.assertEquals(1, CronUtils.getFireDateList(new Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ? ")).size()); + + from = DateUtils.stringToDate("2020-01-01 00:02:00"); + to = DateUtils.stringToDate("2020-01-01 00:04:00"); + Assert.assertEquals(2, CronUtils.getFireDateList(new Date(from.getTime() - 1000), + new Date(to.getTime() - 1000), + CronUtils.parse2CronExpression("0 * * * * ? ")).size()); } @Test