diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index bc7fb92eaa..faa4eb081c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -187,7 +187,7 @@ public class MasterSchedulerService extends Thread { if (processInstance.getTimeout() > 0) { this.processTimeoutCheckList.put(processInstance.getId(), processInstance); } - logger.info("command {} process {} start...", + logger.info("handle command end, command {} process {} start...", command.getId(), processInstance.getId()); masterExecService.execute(workflowExecuteThread); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index a977f0bb77..7b3b53508d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -438,9 +438,14 @@ public class WorkflowExecuteThread implements Runnable { endProcess(); int index = complementListDate.indexOf(scheduleDate); if (index >= complementListDate.size() - 1 || !processInstance.getState().typeIsSuccess()) { + logger.info("process complement end. process id:{}", processInstance.getId()); // complement data ends || no success return false; } + logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", + processInstance.getId(), + processInstance.getScheduleTime(), + complementListDate.toString()); scheduleDate = complementListDate.get(index + 1); //the next process complement processInstance.setId(0); @@ -555,16 +560,9 @@ public class WorkflowExecuteThread implements Runnable { } if (complementListDate.size() == 0 && needComplementProcess()) { - Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); - Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); - if (startDate.after(endDate)) { - Date tmp = startDate; - startDate = endDate; - endDate = tmp; - } - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); - complementListDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules)); + complementListDate = processService.getComplementDateList( + JSONUtils.toMap(processInstance.getCommandParam()), + processInstance.getProcessDefinitionCode()); logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate.toString()); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bf4f6f79ee..cbd4f387fb 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -104,6 +104,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; +import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.commons.lang.StringUtils; @@ -583,8 +584,16 @@ public class ProcessService { */ private Date getScheduleTime(Command command, Map cmdParam) { Date scheduleTime = command.getScheduleTime(); - if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { - scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + if (scheduleTime == null + && cmdParam != null + && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + List complementDateList = getComplementDateList(cmdParam, command.getProcessDefinitionCode()); + if (complementDateList.size() > 0) { + scheduleTime = complementDateList.get(0); + } else { + logger.error("set scheduler time error: complement date list is empty, command: {}", + command.toString()); + } } return scheduleTime; } @@ -963,10 +972,11 @@ public class ProcessService { return; } - Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE), - YYYY_MM_DD_HH_MM_SS); - if (Flag.NO == processInstance.getIsSubProcess()) { - processInstance.setScheduleTime(startComplementTime); + List complementDate = getComplementDateList(cmdParam, processInstance.getProcessDefinitionCode()); + + if (complementDate.size() > 0 + && Flag.NO == processInstance.getIsSubProcess()) { + processInstance.setScheduleTime(complementDate.get(0)); } processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), @@ -975,6 +985,27 @@ public class ProcessService { } + /** + * return complement date list + * + * @param cmdParam + * @param processDefinitionCode + * @return + */ + public List getComplementDateList(Map cmdParam, Long processDefinitionCode) { + List result = new ArrayList<>(); + Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + if (startDate.after(endDate)) { + Date tmp = startDate; + startDate = endDate; + endDate = tmp; + } + List schedules = queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); + result.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules)); + return result; + } + /** * set sub work process parameters. * handle sub work process instance, update relation table and command parameters