|
|
@ -18,15 +18,16 @@ |
|
|
|
package org.apache.dolphinscheduler.api.service.impl; |
|
|
|
package org.apache.dolphinscheduler.api.service.impl; |
|
|
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; |
|
|
|
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.COMMA; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.Constants.COMMA; |
|
|
|
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH; |
|
|
|
import static org.apache.dolphinscheduler.common.constants.Constants.MAX_TASK_TIMEOUT; |
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.constants.Constants.SCHEDULE_TIME_MAX_LENGTH; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; |
|
|
|
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; |
|
|
|
import org.apache.dolphinscheduler.api.enums.ExecuteType; |
|
|
|
import org.apache.dolphinscheduler.api.enums.ExecuteType; |
|
|
@ -35,7 +36,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
|
|
|
import org.apache.dolphinscheduler.api.service.ExecutorService; |
|
|
|
import org.apache.dolphinscheduler.api.service.ExecutorService; |
|
|
|
import org.apache.dolphinscheduler.api.service.MonitorService; |
|
|
|
import org.apache.dolphinscheduler.api.service.MonitorService; |
|
|
|
import org.apache.dolphinscheduler.api.service.ProjectService; |
|
|
|
import org.apache.dolphinscheduler.api.service.ProjectService; |
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
import org.apache.dolphinscheduler.common.constants.Constants; |
|
|
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
|
|
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
|
|
|
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; |
|
|
|
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; |
|
|
|
import org.apache.dolphinscheduler.common.enums.CycleEnum; |
|
|
|
import org.apache.dolphinscheduler.common.enums.CycleEnum; |
|
|
@ -276,8 +277,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
Map<String, String> cronMap = JSONUtils.toMap(cronTime); |
|
|
|
Map<String, String> cronMap = JSONUtils.toMap(cronTime); |
|
|
|
if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
if (cronMap.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); |
|
|
|
String[] stringDates = cronMap.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); |
|
|
|
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { |
|
|
|
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { |
|
|
|
logger.warn("Parameter cornTime is bigger than {}.", SCHEDULE_TIME_MAX_LENGTH); |
|
|
|
logger.warn("Parameter cornTime is bigger than {}.", SCHEDULE_TIME_MAX_LENGTH); |
|
|
|
return false; |
|
|
|
return false; |
|
|
@ -335,7 +336,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
.filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach( |
|
|
|
.filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach( |
|
|
|
taskDefinition -> processDefinitionCodeSet.add(Long.valueOf( |
|
|
|
taskDefinition -> processDefinitionCodeSet.add(Long.valueOf( |
|
|
|
JSONUtils.getNodeString(taskDefinition.getTaskParams(), |
|
|
|
JSONUtils.getNodeString(taskDefinition.getTaskParams(), |
|
|
|
Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); |
|
|
|
CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); |
|
|
|
if (processDefinitionCodeSet.isEmpty()) { |
|
|
|
if (processDefinitionCodeSet.isEmpty()) { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
@ -405,7 +406,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
}); |
|
|
|
}); |
|
|
|
String startParams = null; |
|
|
|
String startParams = null; |
|
|
|
if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { |
|
|
|
if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { |
|
|
|
Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS); |
|
|
|
Object startParamsJson = commandMap.get(CMD_PARAM_START_PARAMS); |
|
|
|
if (startParamsJson != null) { |
|
|
|
if (startParamsJson != null) { |
|
|
|
startParams = startParamsJson.toString(); |
|
|
|
startParams = startParamsJson.toString(); |
|
|
|
} |
|
|
|
} |
|
|
@ -811,21 +812,21 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; |
|
|
|
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; |
|
|
|
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); |
|
|
|
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); |
|
|
|
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam); |
|
|
|
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam); |
|
|
|
if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); |
|
|
|
dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); |
|
|
|
dateList = removeDuplicates(dateList); |
|
|
|
dateList = removeDuplicates(dateList); |
|
|
|
} |
|
|
|
} |
|
|
|
if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey( |
|
|
|
if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey( |
|
|
|
CMDPARAM_COMPLEMENT_DATA_END_DATE)) { |
|
|
|
CMD_PARAM_COMPLEMENT_DATA_END_DATE)) { |
|
|
|
startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); |
|
|
|
startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE); |
|
|
|
endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); |
|
|
|
endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE); |
|
|
|
} |
|
|
|
} |
|
|
|
switch (runMode) { |
|
|
|
switch (runMode) { |
|
|
|
case RUN_MODE_SERIAL: { |
|
|
|
case RUN_MODE_SERIAL: { |
|
|
|
logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", |
|
|
|
logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", |
|
|
|
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); |
|
|
|
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); |
|
|
|
if (StringUtils.isNotEmpty(dateList)) { |
|
|
|
if (StringUtils.isNotEmpty(dateList)) { |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
createCount = commandService.createCommand(command); |
|
|
|
createCount = commandService.createCommand(command); |
|
|
@ -838,8 +839,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (startDate != null && endDate != null) { |
|
|
|
if (startDate != null && endDate != null) { |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startDate); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endDate); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
createCount = commandService.createCommand(command); |
|
|
|
createCount = commandService.createCommand(command); |
|
|
@ -904,9 +905,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
endDateIndex += singleCommandItems; |
|
|
|
endDateIndex += singleCommandItems; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, |
|
|
|
DateUtils.dateToString(listDate.get(startDateIndex))); |
|
|
|
DateUtils.dateToString(listDate.get(startDateIndex))); |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, |
|
|
|
DateUtils.dateToString(listDate.get(endDateIndex))); |
|
|
|
DateUtils.dateToString(listDate.get(endDateIndex))); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
@ -941,7 +942,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", |
|
|
|
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", |
|
|
|
createCount); |
|
|
|
createCount); |
|
|
|
for (List<String> stringDate : Lists.partition(listDate, createCount)) { |
|
|
|
for (List<String> stringDate : Lists.partition(listDate, createCount)) { |
|
|
|
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); |
|
|
|
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
logger.info("Creating command, commandInfo:{}.", command); |
|
|
|
if (commandService.createCommand(command) > 0) { |
|
|
|
if (commandService.createCommand(command) > 0) { |
|
|
@ -1054,14 +1055,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ |
|
|
|
if (scheduleResult == null) { |
|
|
|
if (scheduleResult == null) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
if (scheduleResult.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { |
|
|
|
if (scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null) { |
|
|
|
if (scheduleResult.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { |
|
|
|
if (scheduleResult.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE)) { |
|
|
|
String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); |
|
|
|
String startDate = scheduleResult.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE); |
|
|
|
String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); |
|
|
|
String endDate = scheduleResult.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE); |
|
|
|
if (startDate == null || endDate == null) { |
|
|
|
if (startDate == null || endDate == null) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|