|
|
|
@ -18,6 +18,7 @@
|
|
|
|
|
package org.apache.dolphinscheduler.api.service.impl; |
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; |
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; |
|
|
|
@ -77,6 +78,7 @@ import org.apache.commons.collections.CollectionUtils;
|
|
|
|
|
import org.apache.commons.collections.MapUtils; |
|
|
|
|
import org.apache.commons.lang.StringUtils; |
|
|
|
|
|
|
|
|
|
import java.time.ZoneId; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.HashMap; |
|
|
|
@ -84,6 +86,7 @@ import java.util.HashSet;
|
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.TimeZone; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
@ -704,14 +707,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
|
|
|
|
|
logger.warn("The startDate {} is later than the endDate {}", start, end); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); |
|
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
|
createCount = processService.createCommand(command); |
|
|
|
|
|
|
|
|
|
// dependent process definition
|
|
|
|
|
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); |
|
|
|
|
|
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start, ZoneId.systemDefault().getId())); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end, ZoneId.systemDefault().getId())); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, JSONUtils.toJsonString(CronUtils.getSelfFireDateList(start, end, schedules))); |
|
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
|
createCount = processService.createCommand(command); |
|
|
|
|
|
|
|
|
|
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { |
|
|
|
|
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " |
|
|
|
|
+ "dependent complement data", command.getProcessDefinitionCode()); |
|
|
|
@ -727,6 +732,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
logger.info("before createComplementCommandList {} {}", start, end); |
|
|
|
|
|
|
|
|
|
List<Date> listDate = new ArrayList<>(); |
|
|
|
|
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); |
|
|
|
|
listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); |
|
|
|
@ -759,9 +766,21 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
|
|
|
|
|
endDateIndex += singleCommandItems; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex))); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex))); |
|
|
|
|
Date startDate = listDate.get(startDateIndex); |
|
|
|
|
Date endDate = listDate.get(endDateIndex); |
|
|
|
|
|
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(startDate, ZoneId.systemDefault().getId())); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(endDate, ZoneId.systemDefault().getId())); |
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, JSONUtils.toJsonString(CronUtils.getSelfFireDateList(startDate, endDate, schedules))); |
|
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
|
|
|
|
|
|
logger.info("before create complement command: startDate:{}, startStr:{}, endDate:{}, endStr:{}, complement date list: {}", |
|
|
|
|
listDate.get(startDateIndex), |
|
|
|
|
cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE), |
|
|
|
|
listDate.get(endDateIndex), |
|
|
|
|
cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE), |
|
|
|
|
cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)); |
|
|
|
|
|
|
|
|
|
processService.createCommand(command); |
|
|
|
|
|
|
|
|
|
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { |
|
|
|
|