diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java index bdefefe026..4c3124a104 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java @@ -193,7 +193,7 @@ public class BaseServiceImpl implements BaseService { Map result = new HashMap<>(); Date start = null; if (!StringUtils.isEmpty(startDateStr)) { - start = DateUtils.getScheduleDate(startDateStr); + start = DateUtils.stringToDate(startDateStr); if (Objects.isNull(start)) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); return result; @@ -203,7 +203,7 @@ public class BaseServiceImpl implements BaseService { Date end = null; if (!StringUtils.isEmpty(endDateStr)) { - end = DateUtils.getScheduleDate(endDateStr); + end = DateUtils.stringToDate(endDateStr); if (Objects.isNull(end)) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java index f4f544bae6..091387203e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java @@ -166,8 +166,8 @@ public class DataAnalysisServiceImpl extends BaseServiceImpl implements DataAnal Date start = null; Date end = null; if (!StringUtils.isEmpty(startDate) && !StringUtils.isEmpty(endDate)) { - start = DateUtils.getScheduleDate(startDate); - end = DateUtils.getScheduleDate(endDate); + start = DateUtils.stringToDate(startDate); + end = DateUtils.stringToDate(endDate); if (Objects.isNull(start) || Objects.isNull(end)) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java index c5ee6363a5..ae14e9e4af 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java @@ -66,10 +66,10 @@ public class DqExecuteResultServiceImpl extends BaseServiceImpl implements DqExe Date end = null; try { if (StringUtils.isNotEmpty(startTime)) { - start = DateUtils.getScheduleDate(startTime); + start = DateUtils.stringToDate(startTime); } if (StringUtils.isNotEmpty(endTime)) { - end = DateUtils.getScheduleDate(endTime); + end = DateUtils.stringToDate(endTime); } } catch (Exception e) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime"); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java index a4f78ec830..9c398d8609 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java @@ -163,10 +163,10 @@ public class DqRuleServiceImpl extends BaseServiceImpl implements DqRuleService Date end = null; try { if (StringUtils.isNotEmpty(startTime)) { - start = DateUtils.getScheduleDate(startTime); + start = DateUtils.stringToDate(startTime); } if (StringUtils.isNotEmpty(endTime)) { - end = DateUtils.getScheduleDate(endTime); + end = DateUtils.stringToDate(endTime); } } catch (Exception e) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime"); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 0e7d1b5abb..e8092ee9fa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -17,12 +17,17 @@ package org.apache.dolphinscheduler.api.service.impl; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.Lists; -import org.apache.commons.beanutils.BeanUtils; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH; + import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; @@ -66,16 +71,18 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.corn.CronUtils; +import org.apache.dolphinscheduler.service.cron.CronUtils; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,16 +90,13 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; -import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; /** * executor service impl @@ -135,38 +139,39 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * execute process instance * - * @param loginUser login user - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param cronTime cron time - * @param commandType command type - * @param failureStrategy failure strategy - * @param startNodeList start nodelist - * @param taskDependType node dependency type - * @param warningType warning type - * @param warningGroupId notify group id - * @param processInstancePriority process instance priority - * @param workerGroup worker group name - * @param environmentCode environment code - * @param runMode run mode - * @param timeout timeout - * @param startParams the global param values which pass to new process instance + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param cronTime cron time + * @param commandType command type + * @param failureStrategy failure strategy + * @param startNodeList start nodelist + * @param taskDependType node dependency type + * @param warningType warning type + * @param warningGroupId notify group id + * @param processInstancePriority process instance priority + * @param workerGroup worker group name + * @param environmentCode environment code + * @param runMode run mode + * @param timeout timeout + * @param startParams the global param values which pass to new process instance * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @return execute process instance code */ @Override - public Map execProcessInstance(User loginUser, long projectCode, - long processDefinitionCode, String cronTime, CommandType commandType, + public Map execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, + String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, - TaskDependType taskDependType, WarningType warningType, int warningGroupId, - RunMode runMode, - Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout, + TaskDependType taskDependType, WarningType warningType, + int warningGroupId, RunMode runMode, + Priority processInstancePriority, String workerGroup, + Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun, - ComplementDependentMode complementDependentMode) { + int dryRun, ComplementDependentMode complementDependentMode) { Project project = projectMapper.queryByCode(projectCode); //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START); + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -185,12 +190,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (!checkTenantSuitable(processDefinition)) { logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", - processDefinition.getId(), processDefinition.getName()); + processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); return result; } - if(!checkScheduleTimeNum(commandType,cronTime)){ + if (!checkScheduleTimeNum(commandType, cronTime)) { putMsg(result, Status.SCHEDULE_TIME_NUMBER); return result; } @@ -202,10 +207,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create command */ - int create = this.createCommand(commandType, processDefinition.getCode(), - taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, - expectedParallelismNumber, dryRun, complementDependentMode); + int create = + this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, + cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, + environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -236,19 +241,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } /** - * * @param complementData * @param cronTime * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false */ - private boolean checkScheduleTimeNum(CommandType complementData,String cronTime) { + private boolean checkScheduleTimeNum(CommandType complementData, String cronTime) { if (!CommandType.COMPLEMENT_DATA.equals(complementData)) { return true; } - if(cronTime == null){ + if (cronTime == null) { return true; } - Map cronMap = JSONUtils.toMap(cronTime); + Map cronMap = JSONUtils.toMap(cronTime); if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { @@ -261,14 +265,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * check whether the process definition can be executed * - * @param projectCode project code + * @param projectCode project code * @param processDefinition process definition * @param processDefineCode process definition code - * @param version process instance verison + * @param version process instance verison * @return check result code */ @Override - public Map checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) { + public Map checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, + long processDefineCode, Integer version) { Map result = new HashMap<>(); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { // check process definition exists @@ -287,41 +292,47 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * check if the current process has subprocesses and all subprocesses are valid + * * @param processDefinition * @return check result */ @Override public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) { // query all subprocesses under the current process - List processTaskRelations = processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode()); + List processTaskRelations = + processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode()); if (processTaskRelations.isEmpty()) { return true; } - Set relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet()); + Set relationCodes = + processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet()); List taskDefinitions = taskDefinitionMapper.queryByCodeList(relationCodes); // find out the process definition code Set processDefinitionCodeSet = new HashSet<>(); taskDefinitions.stream() - .filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())) - .forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); + .filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach( + taskDefinition -> processDefinitionCodeSet.add(Long.valueOf( + JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); if (processDefinitionCodeSet.isEmpty()) { return true; } // check sub releaseState List processDefinitions = processDefinitionMapper.queryByCodes(processDefinitionCodeSet); - return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty(); + return processDefinitions.stream() + .filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()) + .isEmpty(); } /** * do action to process instanceļ¼špause, stop, repeat, recover from pause, recover from stop * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processInstanceId process instance id - * @param executeType execute type + * @param executeType execute type * @return execute result code */ @Override @@ -329,7 +340,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Project project = projectMapper.queryByCode(projectCode); //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType)); + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, + ApiFuncIdentificationConstant.map.get(executeType)); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -345,10 +357,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + ProcessDefinition processDefinition = + processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { - result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + result = + checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -360,12 +375,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } if (!checkTenantSuitable(processDefinition)) { logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", - processDefinition.getId(), processDefinition.getName()); + processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); } //get the startParams user specified at the first starting while repeat running is needed - Map commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference>() {}); + Map commandMap = + JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference>() { + }); String startParams = null; if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS); @@ -376,19 +393,24 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), + processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), + processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), + processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { - putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); + putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), + processInstance.getState()); } else { - result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); + result = + updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); } break; case PAUSE: @@ -432,8 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return true if tenant suitable, otherwise return false */ private boolean checkTenantSuitable(ProcessDefinition processDefinition) { - Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(), - processDefinition.getUserId()); + Tenant tenant = + processService.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId()); return tenant != null; } @@ -441,7 +463,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * Check the state of process instance and the type of operation match * * @param processInstance process instance - * @param executeType execute type + * @param executeType execute type * @return check result code */ private Map checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) { @@ -475,7 +497,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ break; } if (!checkResult) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString()); + putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), + executionStatus.toString(), executeType.toString()); } else { putMsg(result, Status.SUCCESS); } @@ -486,11 +509,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * prepare to update process instance command type and status * * @param processInstance process instance - * @param commandType command type + * @param commandType command type * @param executionStatus execute status * @return update result */ - private Map updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { + private Map updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, + ExecutionStatus executionStatus) { Map result = new HashMap<>(); processInstance.setCommandType(commandType); @@ -528,8 +552,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ taskGroupQueue.setForceStart(Flag.YES.getCode()); processService.updateTaskGroupQueue(taskGroupQueue); - processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId() - ,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST); + processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId(), + org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST); putMsg(result, Status.SUCCESS); return result; } @@ -537,14 +561,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution * - * @param loginUser login user - * @param instanceId instance id + * @param loginUser login user + * @param instanceId instance id * @param processDefinitionCode process definition code * @param processVersion - * @param commandType command type + * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { + private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, + int processVersion, CommandType commandType, String startParams) { Map result = new HashMap<>(); //To add startParams only when repeat running is needed @@ -607,8 +632,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) { putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName()); - logger.info("not release process definition id: {} , name : {}", - processDefinitionTmp.getId(), processDefinitionTmp.getName()); + logger.info("not release process definition id: {} , name : {}", processDefinitionTmp.getId(), + processDefinitionTmp.getName()); return result; } } @@ -621,27 +646,27 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create command * - * @param commandType commandType - * @param processDefineCode processDefineCode - * @param nodeDep nodeDep - * @param failureStrategy failureStrategy - * @param startNodeList startNodeList - * @param schedule schedule - * @param warningType warningType - * @param executorId executorId - * @param warningGroupId warningGroupId - * @param runMode runMode + * @param commandType commandType + * @param processDefineCode processDefineCode + * @param nodeDep nodeDep + * @param failureStrategy failureStrategy + * @param startNodeList startNodeList + * @param schedule schedule + * @param warningType warningType + * @param executorId executorId + * @param warningGroupId warningGroupId + * @param runMode runMode * @param processInstancePriority processInstancePriority - * @param workerGroup workerGroup - * @param environmentCode environmentCode + * @param workerGroup workerGroup + * @param environmentCode environmentCode * @return command id */ - private int createCommand(CommandType commandType, long processDefineCode, - TaskDependType nodeDep, FailureStrategy failureStrategy, - String startNodeList, String schedule, WarningType warningType, - int executorId, int warningGroupId, - RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, - Map startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) { + private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, + FailureStrategy failureStrategy, String startNodeList, String schedule, + WarningType warningType, int executorId, int warningGroupId, RunMode runMode, + Priority processInstancePriority, String workerGroup, Long environmentCode, + Map startParams, Integer expectedParallelismNumber, int dryRun, + ComplementDependentMode complementDependentMode) { /** * instantiate command schedule instance @@ -689,11 +714,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (schedule == null || StringUtils.isEmpty(schedule)) { return 0; } - int check = checkScheduleTime(schedule); - if(check == 0){ + if (!isValidateScheduleTime(schedule)) { + return 0; + } + try { + return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, + complementDependentMode); + } catch (CronParseException cronParseException) { + // this just make compile happy, since we already validate the cron before + logger.error("Parse cron error", cronParseException); return 0; } - return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode); } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); @@ -709,7 +740,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return */ protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command, - Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) { + Integer expectedParallelismNumber, + ComplementDependentMode complementDependentMode) + throws CronParseException { int createCount = 0; String startDate = null; String endDate = null; @@ -718,33 +751,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; Map cmdParam = JSONUtils.toMap(command.getCommandParam()); Map scheduleParam = JSONUtils.toMap(scheduleTimeParam); - if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); dateList = removeDuplicates(dateList); } - if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){ + if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey( + CMDPARAM_COMPLEMENT_DATA_END_DATE)) { startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); } switch (runMode) { case RUN_MODE_SERIAL: { - if(StringUtils.isNotEmpty(dateList)){ + if (StringUtils.isNotEmpty(dateList)) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); createCount = processService.createCommand(command); } - if(startDate != null && endDate != null){ + if (startDate != null && endDate != null) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); createCount = processService.createCommand(command); // dependent process definition - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( + command.getProcessDefinitionCode()); if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " - + "dependent complement data", command.getProcessDefinitionCode()); + + "dependent complement data", command.getProcessDefinitionCode()); } else { dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); } @@ -752,10 +787,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ break; } case RUN_MODE_PARALLEL: { - if(startDate != null && endDate != null){ - List listDate = new ArrayList<>(); - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); - listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), schedules)); + if (startDate != null && endDate != null) { + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( + command.getProcessDefinitionCode()); + List listDate = new ArrayList<>( + CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate), + DateUtils.stringToZoneDateTime(endDate), schedules)); int listDateSize = listDate.size(); createCount = listDate.size(); if (!CollectionUtils.isEmpty(listDate)) { @@ -791,15 +828,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processService.createCommand(command); if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " + logger.info( + "process code: {} complement dependent in off mode or schedule's size is 0, skip " + "dependent complement data", command.getProcessDefinitionCode()); } else { - dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); + dependentProcessDefinitionCreateCount += + createComplementDependentCommand(schedules, command); } } } } - if(StringUtils.isNotEmpty(dateList)){ + if (StringUtils.isNotEmpty(dateList)) { List listDate = Arrays.asList(dateList.split(COMMA)); int listDateSize = listDate.size(); createCount = listDate.size(); @@ -823,8 +862,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ default: break; } - logger.info("create complement command count: {}, create dependent complement command count: {}", createCount - , dependentProcessDefinitionCreateCount); + logger.info("create complement command count: {}, create dependent complement command count: {}", createCount, + dependentProcessDefinitionCreateCount); return createCount; } @@ -843,9 +882,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } List dependentProcessDefinitionList = - getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), - CronUtils.getMaxCycle(schedules.get(0).getCrontab()), - dependentCommand.getWorkerGroup()); + getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), + CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup()); dependentCommand.setTaskDependType(TaskDependType.TASK_POST); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { @@ -864,33 +902,36 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * get complement dependent process definition list */ private List getComplementDependentDefinitionList(long processDefinitionCode, - CycleEnum processDefinitionCycle, - String workerGroup) { + CycleEnum processDefinitionCycle, + String workerGroup) { List dependentProcessDefinitionList = - processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); + processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); - return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup); + return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, + workerGroup); } /** - * Check whether the dependency cycle of the dependent node is consistent with the schedule cycle of - * the dependent process definition and if there is no worker group in the schedule, use the complement selection's - * worker group + * Check whether the dependency cycle of the dependent node is consistent with the schedule cycle of + * the dependent process definition and if there is no worker group in the schedule, use the complement selection's + * worker group */ - private List checkDependentProcessDefinitionValid(List dependentProcessDefinitionList, - CycleEnum processDefinitionCycle, - String workerGroup) { + private List checkDependentProcessDefinitionValid( + List dependentProcessDefinitionList, CycleEnum processDefinitionCycle, + String workerGroup) { List validDependentProcessDefinitionList = new ArrayList<>(); - List processDefinitionCodeList = dependentProcessDefinitionList.stream() - .map(DependentProcessDefinition::getProcessDefinitionCode) + List processDefinitionCodeList = + dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode) .collect(Collectors.toList()); - Map processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); + Map processDefinitionWorkerGroupMap = + processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { - if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { + if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) + == null) { dependentProcessDefinition.setWorkerGroup(workerGroup); } @@ -902,52 +943,51 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } /** - * * @param schedule * @return check error return 0 otherwish 1 */ - private int checkScheduleTime(String schedule){ - Date start = null; - Date end = null; - Map scheduleResult = JSONUtils.toMap(schedule); - if(scheduleResult == null){ - return 0; - } - if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ - if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null){ - return 0; + private boolean isValidateScheduleTime(String schedule) { + Map scheduleResult = JSONUtils.toMap(schedule); + if (scheduleResult == null) { + return false; + } + if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { + if (scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null) { + return false; } } - if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){ + if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); if (startDate == null || endDate == null) { - return 0; - } - start = DateUtils.getScheduleDate(startDate); - end = DateUtils.getScheduleDate(endDate); - if(start == null || end == null){ - return 0; + return false; } - if (start.after(end)) { - logger.error("complement data error, wrong date start:{} and end date:{} ", - start, end - ); - return 0; + try { + ZonedDateTime start = DateUtils.stringToZoneDateTime(startDate); + ZonedDateTime end = DateUtils.stringToZoneDateTime(endDate); + if (start == null || end == null) { + return false; + } + if (start.isAfter(end)) { + logger.error("complement data error, wrong date start:{} and end date:{} ", start, end); + return false; + } + } catch (Exception ex) { + logger.warn("Parse schedule time error, startDate: {}, endDate: {}", startDate, endDate); + return false; } } - return 1; + return true; } /** - * * @param scheduleTimeList * @return remove duplicate date list */ - private String removeDuplicates(String scheduleTimeList){ + private String removeDuplicates(String scheduleTimeList) { HashSet removeDate = new HashSet(); List resultList = new ArrayList(); - if(StringUtils.isNotEmpty(scheduleTimeList)){ + if (StringUtils.isNotEmpty(scheduleTimeList)) { String[] dateArrays = scheduleTimeList.split(COMMA); List dateList = Arrays.asList(dateArrays); removeDate.addAll(dateList); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 38d5893247..19f4df3333 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -560,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) { Date schedule = processInstance.getScheduleTime(); if (scheduleTime != null) { - schedule = DateUtils.getScheduleDate(scheduleTime); + schedule = DateUtils.stringToDate(scheduleTime); } processInstance.setScheduleTime(schedule); List globalParamList = JSONUtils.toList(globalParams, Property.class); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 5457872d18..983efa7a11 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -46,28 +46,31 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; -import org.apache.dolphinscheduler.service.corn.CronUtils; +import org.apache.dolphinscheduler.service.cron.CronUtils; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; -import java.text.ParseException; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; +import java.util.stream.Collectors; -import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.cronutils.model.Cron; /** * scheduler service impl @@ -108,16 +111,16 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * save schedule * - * @param loginUser login user - * @param projectCode project name - * @param processDefineCode process definition code - * @param schedule scheduler - * @param warningType warning type - * @param warningGroupId warning group id - * @param failureStrategy failure strategy + * @param loginUser login user + * @param projectCode project name + * @param processDefineCode process definition code + * @param schedule scheduler + * @param warningType warning type + * @param warningGroupId warning group id + * @param failureStrategy failure strategy * @param processInstancePriority process instance priority - * @param workerGroup worker group - * @param environmentCode environment code + * @param workerGroup worker group + * @param environmentCode environment code * @return create result code */ @Override @@ -138,14 +141,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Project project = projectMapper.queryByCode(projectCode); // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null); if (!hasProjectAndPerm) { return result; } // check work flow define release state ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode); - result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode, processDefinition.getVersion()); + result = executorService.checkProcessDefinitionValid(projectCode, processDefinition, processDefineCode, + processDefinition.getVersion()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -209,15 +213,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * updateProcessInstance schedule * - * @param loginUser login user - * @param projectCode project code - * @param id scheduler id - * @param scheduleExpression scheduler - * @param warningType warning type - * @param warningGroupId warning group id - * @param failureStrategy failure strategy - * @param workerGroup worker group - * @param environmentCode environment code + * @param loginUser login user + * @param projectCode project code + * @param id scheduler id + * @param scheduleExpression scheduler + * @param warningType warning type + * @param warningGroupId warning group id + * @param failureStrategy failure strategy + * @param workerGroup worker group + * @param environmentCode environment code * @param processInstancePriority process instance priority * @return update result code */ @@ -238,7 +242,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Project project = projectMapper.queryByCode(projectCode); // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null); if (!hasProjectAndPerm) { return result; } @@ -265,9 +269,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * set schedule online or offline * - * @param loginUser login user - * @param projectCode project code - * @param id scheduler id + * @param loginUser login user + * @param projectCode project code + * @param id scheduler id * @param scheduleStatus schedule status * @return publish result code */ @@ -281,7 +285,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Project project = projectMapper.queryByCode(projectCode); // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null); if (!hasProjectAndPerm) { return result; } @@ -296,7 +300,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe // check schedule release state if (scheduleObj.getReleaseState() == scheduleStatus) { logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}", - scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); + scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); return result; } @@ -313,8 +317,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe if (scheduleStatus == ReleaseState.ONLINE) { // check process definition release state if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - logger.info("not release process definition id: {} , name : {}", - processDefinition.getId(), processDefinition.getName()); + logger.info("not release process definition id: {} , name : {}", processDefinition.getId(), + processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); return result; } @@ -323,7 +327,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); if (!subProcessDefineCodes.isEmpty()) { List subProcessDefinitionList = - processDefinitionMapper.queryByCodes(subProcessDefineCodes); + processDefinitionMapper.queryByCodes(subProcessDefineCodes); if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { /** @@ -331,8 +335,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe */ if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info("not release process definition id: {} , name : {}", - subProcessDefinition.getId(), subProcessDefinition.getName()); - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId())); + subProcessDefinition.getId(), subProcessDefinition.getName()); + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, + String.valueOf(subProcessDefinition.getId())); return result; } } @@ -379,17 +384,17 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * query schedule * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processDefineCode process definition code - * @param pageNo page number - * @param pageSize page size - * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @param searchVal search value * @return schedule list page */ @Override public Result querySchedule(User loginUser, long projectCode, long processDefineCode, String searchVal, - Integer pageNo, Integer pageSize) { + Integer pageNo, Integer pageSize) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); @@ -407,8 +412,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } Page page = new Page<>(pageNo, pageSize); - IPage scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, - searchVal); + IPage scheduleIPage = + scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal); List scheduleList = new ArrayList<>(); for (Schedule schedule : scheduleIPage.getRecords()) { @@ -426,7 +431,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * query schedule list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return schedule list */ @@ -436,7 +441,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Project project = projectMapper.queryByCode(projectCode); // check project auth - boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); + boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null); if (!hasProjectAndPerm) { return result; } @@ -461,7 +466,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * delete schedule * - * @param projectId project id + * @param projectId project id * @param scheduleId schedule id * @throws RuntimeException runtime exception */ @@ -475,7 +480,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe * check valid * * @param result result - * @param bool bool + * @param bool bool * @param status status * @return check result code */ @@ -491,9 +496,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * delete schedule by id * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param scheduleId scheule id + * @param scheduleId scheule id * @return delete result code */ @Override @@ -502,7 +507,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Map result = new HashMap<>(); Project project = projectMapper.queryByCode(projectCode); - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; @@ -516,8 +521,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } // Determine if the login user is the owner of the schedule - if (loginUser.getId() != schedule.getUserId() - && loginUser.getUserType() != UserType.ADMIN_USER) { + if (loginUser.getId() != schedule.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -542,30 +546,32 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe * preview schedule * * @param loginUser login user - * @param schedule schedule expression + * @param schedule schedule expression * @return the next five fire time */ @Override public Map previewSchedule(User loginUser, String schedule) { Map result = new HashMap<>(); - CronExpression cronExpression; + Cron cron; ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); - Date now = new Date(); - Date startTime = DateUtils.transformTimezoneDate(scheduleParam.getStartTime(), scheduleParam.getTimezoneId()); - Date endTime = DateUtils.transformTimezoneDate(scheduleParam.getEndTime(), scheduleParam.getTimezoneId()); - startTime = now.after(startTime) ? now : startTime; + ZoneId zoneId = TimeZone.getTimeZone(scheduleParam.getTimezoneId()).toZoneId(); + ZonedDateTime now = ZonedDateTime.now(zoneId); + ZonedDateTime startTime = ZonedDateTime.ofInstant(scheduleParam.getStartTime().toInstant(), zoneId); + ZonedDateTime endTime = ZonedDateTime.ofInstant(scheduleParam.getEndTime().toInstant(), zoneId); + startTime = now.isAfter(startTime) ? now : startTime; try { - cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab()); - } catch (ParseException e) { + cron = CronUtils.parse2Cron(scheduleParam.getCrontab()); + } catch (CronParseException e) { logger.error(e.getMessage(), e); putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR); return result; } - List selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT); - List previewDateList = new ArrayList<>(); - selfFireDateList.forEach(date -> previewDateList.add(DateUtils.dateToString(date, scheduleParam.getTimezoneId()))); + List selfFireDateList = + CronUtils.getSelfFireDateList(startTime, endTime, cron, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT); + List previewDateList = + selfFireDateList.stream().map(DateUtils::dateToString).collect(Collectors.toList()); result.put(Constants.DATA_LIST, previewDateList); putMsg(result, Status.SUCCESS); return result; @@ -574,14 +580,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe /** * update process definition schedule * - * @param loginUser login user - * @param projectCode project code - * @param processDefinitionCode process definition code - * @param scheduleExpression scheduleExpression - * @param warningType warning type - * @param warningGroupId warning group id - * @param failureStrategy failure strategy - * @param workerGroup worker group + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param scheduleExpression scheduleExpression + * @param warningType warning type + * @param warningGroupId warning group id + * @param failureStrategy failure strategy + * @param workerGroup worker group * @param processInstancePriority process instance priority * @return update result code */ @@ -598,7 +604,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe long environmentCode) { Project project = projectMapper.queryByCode(projectCode); //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -619,17 +625,12 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } - private void updateSchedule(Map result, - Schedule schedule, - ProcessDefinition processDefinition, - String scheduleExpression, - WarningType warningType, - int warningGroupId, - FailureStrategy failureStrategy, - Priority processInstancePriority, - String workerGroup, + private void updateSchedule(Map result, Schedule schedule, ProcessDefinition processDefinition, + String scheduleExpression, WarningType warningType, int warningGroupId, + FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup, long environmentCode) { - if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { + if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, + Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { return; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java index 807c89a60f..c0cf9ad3ef 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java @@ -61,8 +61,8 @@ public class AuditServiceTest { @Test public void testQueryLogListPaging() { - Date start = DateUtils.getScheduleDate("2020-11-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-11-02 00:00:00"); + Date start = DateUtils.stringToDate("2020-11-01 00:00:00"); + Date end = DateUtils.stringToDate("2020-11-02 00:00:00"); IPage page = new Page<>(1, 10); page.setRecords(getLists()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java index 7d2fb77de7..34df74d697 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java @@ -17,9 +17,16 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; + import org.apache.dolphinscheduler.api.dto.CommandStateCount; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.DataAnalysisServiceImpl; import org.apache.dolphinscheduler.common.Constants; @@ -39,7 +46,16 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -53,21 +69,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; - /** * data analysis service test */ @@ -142,10 +143,14 @@ public class DataAnalysisServiceTest { Mockito.when(projectMapper.queryByCode(1L)).thenReturn(getProject("test")); //SUCCESS - Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate), - DateUtils.getScheduleDate(endDate), new Long[]{1L})).thenReturn(getTaskInstanceStateCounts()); + Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.stringToDate(startDate), + DateUtils.stringToDate(endDate), + new Long[] {1L})).thenReturn(getTaskInstanceStateCounts()); Mockito.when(projectMapper.selectById(Mockito.any())).thenReturn(getProject("test")); - Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), Mockito.any(), (Map)Mockito.any(),Mockito.any())).thenReturn(true); + Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), + Mockito.any(), + (Map) Mockito.any(), + Mockito.any())).thenReturn(true); result = dataAnalysisServiceImpl.countTaskStateByProject(user, 1, startDate, endDate); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); @@ -236,18 +241,22 @@ public class DataAnalysisServiceTest { //checkProject false Map failResult = new HashMap<>(); putMsg(failResult, Status.PROJECT_NOT_FOUND, 1); - Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(),any())).thenReturn(failResult); + Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(), any())).thenReturn(failResult); failResult = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate); Assert.assertEquals(Status.PROJECT_NOT_FOUND, failResult.get(Constants.STATUS)); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, null); - Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(),any())).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(), any())).thenReturn(result); //SUCCESS - Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate), - DateUtils.getScheduleDate(endDate), new Long[]{1L})).thenReturn(getTaskInstanceStateCounts()); - Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), Mockito.any(), (Map)Mockito.any(),Mockito.any())).thenReturn(true); + Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.stringToDate(startDate), + DateUtils.stringToDate(endDate), + new Long[] {1L})).thenReturn(getTaskInstanceStateCounts()); + Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), + Mockito.any(), + (Map) Mockito.any(), + Mockito.any())).thenReturn(true); result = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java index 91e36c732a..cf380a33bd 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java @@ -73,8 +73,8 @@ public class DqExecuteResultServiceTest { String searchVal = ""; int ruleType = 0; - Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); + Date start = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date end = DateUtils.stringToDate("2020-01-02 00:00:00"); User loginUser = new User(); loginUser.setId(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java index 1a2358a642..d82a22cd5b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java @@ -128,8 +128,8 @@ public class DqRuleServiceTest { String searchVal = ""; int ruleType = 0; - Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); + Date start = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date end = DateUtils.stringToDate("2020-01-02 00:00:00"); User loginUser = new User(); loginUser.setId(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 039da1e397..dde556e453 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -171,8 +171,8 @@ public class ProcessInstanceServiceTest { "192.168.xx.xx", "",1, 10); Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) proejctAuthFailRes.getCode()); - Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); + Date start = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date end = DateUtils.stringToDate("2020-01-02 00:00:00"); ProcessInstance processInstance = getProcessInstance(); List processInstanceList = new ArrayList<>(); Page pageReturn = new Page<>(1, 10); @@ -246,8 +246,8 @@ public class ProcessInstanceServiceTest { int size = 10; String startTime = "2020-01-01 00:00:00"; String endTime = "2020-08-02 00:00:00"; - Date start = DateUtils.getScheduleDate(startTime); - Date end = DateUtils.getScheduleDate(endTime); + Date start = DateUtils.stringToDate(startTime); + Date end = DateUtils.stringToDate(endTime); //project auth fail when(projectMapper.queryByCode(projectCode)).thenReturn(project); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index a2f1c539eb..0ce57acf13 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -99,22 +100,44 @@ public class TaskInstanceServiceTest { //project auth fail when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); - Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 0, "", "", - "test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20); - Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int)projectAuthFailRes.getCode()); + Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, + projectCode, + 0, + "", + "", + "test_user", + "2019-02-26 19:48:00", + "2019-02-26 19:48:22", + "", + null, + "", + 1, + 20); + Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) projectAuthFailRes.getCode()); // data parameter check putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_INSTANCE)).thenReturn(result); - Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", - "test_user", "20200101 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); - Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int)dataParameterRes.getCode()); + when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); + Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser, + projectCode, + 1, + "", + "", + "test_user", + "20200101 00:00:00", + "2020-01-02 00:00:00", + "", + ExecutionStatus.SUCCESS, + "192.168.xx.xx", + 1, + 20); + Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode()); //project putMsg(result, Status.SUCCESS, projectCode); - Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); + Date start = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date end = DateUtils.stringToDate("2020-01-02 00:00:00"); ProcessInstance processInstance = getProcessInstance(); TaskInstance taskInstance = getTaskInstance(); List taskInstanceList = new ArrayList<>(); @@ -122,7 +145,7 @@ public class TaskInstanceServiceTest { taskInstanceList.add(taskInstance); pageReturn.setRecords(taskInstanceList); when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_INSTANCE)).thenReturn(result); + when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 3537948410..f01f440240 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -31,6 +31,9 @@ import java.util.Calendar; import java.util.Date; import java.util.TimeZone; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +48,8 @@ public final class DateUtils { static final long C6 = C5 * 24L; private static final Logger logger = LoggerFactory.getLogger(DateUtils.class); - private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS = DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS); + private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS = + DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS); private DateUtils() { throw new UnsupportedOperationException("Construct DateUtils"); @@ -66,7 +70,7 @@ public final class DateUtils { /** * date to local datetime * - * @param date date + * @param date date * @param zoneId zoneId * @return local datetime */ @@ -119,7 +123,8 @@ public final class DateUtils { } public static String format(Date date, DateTimeFormatter dateTimeFormatter, String timezone) { - LocalDateTime localDateTime = StringUtils.isEmpty(timezone) ? date2LocalDateTime(date) : date2LocalDateTime(date, ZoneId.of(timezone)); + LocalDateTime localDateTime = + StringUtils.isEmpty(timezone) ? date2LocalDateTime(date) : date2LocalDateTime(date, ZoneId.of(timezone)); return format(localDateTime, dateTimeFormatter); } @@ -151,7 +156,7 @@ public final class DateUtils { /** * convert time to yyyy-MM-dd HH:mm:ss format * - * @param date date + * @param date date * @param timezone timezone * @return date string */ @@ -159,11 +164,15 @@ public final class DateUtils { return format(date, YYYY_MM_DD_HH_MM_SS, timezone); } + public static String dateToString(ZonedDateTime zonedDateTime) { + return YYYY_MM_DD_HH_MM_SS.format(zonedDateTime); + } + /** * convert string to date and time * - * @param date date - * @param format format + * @param date date + * @param format format * @param timezone timezone, if null, use system default timezone * @return date */ @@ -184,20 +193,39 @@ public final class DateUtils { return null; } + public static ZonedDateTime parseZoneDateTime(@Nonnull String date, @Nonnull DateTimeFormatter dateTimeFormatter, + @Nullable String timezone) { + ZonedDateTime zonedDateTime = ZonedDateTime.parse(date, dateTimeFormatter); + if (StringUtils.isNotEmpty(timezone)) { + return zonedDateTime.withZoneSameInstant(ZoneId.of(timezone)); + } + return zonedDateTime; + } + /** * convert date str to yyyy-MM-dd HH:mm:ss format * * @param date date string * @return yyyy-MM-dd HH:mm:ss format */ - public static Date stringToDate(String date) { + public static @Nullable Date stringToDate(String date) { return parse(date, YYYY_MM_DD_HH_MM_SS, null); } + public static ZonedDateTime stringToZoneDateTime(@Nonnull String date) { + Date d = stringToDate(date); + if (d == null) { + throw new IllegalArgumentException(String.format( + "data: %s should be a validate data string - yyyy-MM-dd HH:mm:ss ", + date)); + } + return ZonedDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault()); + } + /** * convert date str to yyyy-MM-dd HH:mm:ss format * - * @param date date string + * @param date date string * @param timezone * @return yyyy-MM-dd HH:mm:ss format */ @@ -267,16 +295,6 @@ public final class DateUtils { return future.getTime() > old.getTime(); } - /** - * convert schedule string to date - * - * @param schedule schedule - * @return convert schedule string to date - */ - public static Date getScheduleDate(String schedule) { - return stringToDate(schedule); - } - /** * format time to readable * @@ -554,8 +572,10 @@ public final class DateUtils { return date; } String dateToString = dateToString(date, sourceTimezoneId); - LocalDateTime localDateTime = LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS)); - ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId()); + LocalDateTime localDateTime = + LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS)); + ZonedDateTime zonedDateTime = + ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId()); return Date.from(zonedDateTime.toInstant()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index fdec38e65e..525aa6154e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -76,7 +76,8 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; -import org.apache.dolphinscheduler.service.corn.CronUtils; +import org.apache.dolphinscheduler.service.cron.CronUtils; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; @@ -323,15 +324,14 @@ public class WorkflowExecuteRunnable implements Callable { } public String getKey() { - if (StringUtils.isNotEmpty(key) - || this.processDefinition == null) { + if (StringUtils.isNotEmpty(key) || this.processDefinition == null) { return key; } key = String.format("%d_%d_%d", - this.processDefinition.getCode(), - this.processDefinition.getVersion(), - this.processInstance.getId()); + this.processDefinition.getCode(), + this.processDefinition.getVersion(), + this.processInstance.getId()); return key; } @@ -443,8 +443,7 @@ public class WorkflowExecuteRunnable implements Callable { this.stateEvents.add(nextEvent); } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } } @@ -468,13 +467,8 @@ public class WorkflowExecuteRunnable implements Callable { } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); if (!taskInstance.retryTaskIntervalOverTime()) { - logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", - processInstance.getId(), - newTaskInstance.getTaskCode(), - newTaskInstance.getState(), - newTaskInstance.getRetryTimes(), - newTaskInstance.getMaxRetryTimes(), - newTaskInstance.getRetryInterval()); + logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", processInstance.getId(), newTaskInstance.getTaskCode(), + newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval()); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); } else { @@ -490,8 +484,7 @@ public class WorkflowExecuteRunnable implements Callable { public void refreshProcessInstance(int processInstanceId) { logger.info("process instance update: {}", processInstanceId); processInstance = processService.findProcessInstanceById(processInstanceId); - processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); } @@ -612,10 +605,7 @@ public class WorkflowExecuteRunnable implements Callable { // complement data ends || no success return true; } - logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", - processInstance.getId(), - processInstance.getScheduleTime(), - complementListDate); + logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", processInstance.getId(), processInstance.getScheduleTime(), complementListDate); scheduleDate = complementListDate.get(index + 1); } //the next process complement @@ -663,8 +653,7 @@ public class WorkflowExecuteRunnable implements Callable { } private boolean needComplementProcess() { - if (processInstance.isComplementData() - && Flag.NO == processInstance.getIsSubProcess()) { + if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { return true; } return false; @@ -719,11 +708,13 @@ public class WorkflowExecuteRunnable implements Callable { public void checkSerialProcess(ProcessDefinition processDefinition) { int nextInstanceId = processInstance.getNextProcessInstanceId(); if (nextInstanceId == 0) { - ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId()); + ProcessInstance nextProcessInstance = + this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId()); if (nextProcessInstance == null) { return; } - ProcessInstance nextReadyStopProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId()); + ProcessInstance nextReadyStopProcessInstance = + this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId()); if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) { return; } @@ -753,8 +744,7 @@ public class WorkflowExecuteRunnable implements Callable { if (this.dag != null) { return; } - processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); List recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam()); @@ -773,8 +763,7 @@ public class WorkflowExecuteRunnable implements Callable { // generate process to get DAG info List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); - ProcessDag processDag = generateFlowDag(taskNodeList, - startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); + ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); if (processDag == null) { logger.error("processDag is null"); return; @@ -786,7 +775,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * init task queue */ - private void initTaskQueue() throws StateEventHandleException { + private void initTaskQueue() throws StateEventHandleException, CronParseException { taskFailedSubmit = false; activeTaskProcessorMaps.clear(); @@ -856,15 +845,13 @@ public class WorkflowExecuteRunnable implements Callable { if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); } - logger.info(" process definition code:{} complement data: {}", - processInstance.getProcessDefinitionCode(), complementListDate); + logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate); if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); - String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(), - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); + String globalParams = + curingParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, + processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); processInstance.setGlobalParams(globalParams); processService.updateProcessInstance(processInstance); } @@ -887,16 +874,13 @@ public class WorkflowExecuteRunnable implements Callable { ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); taskProcessor.init(taskInstance, processInstance); - if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION - && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { + if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { - logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", - processInstance.getId(), processInstance.getName(), - taskInstance.getId(), taskInstance.getName()); + logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); return Optional.empty(); } @@ -1373,11 +1357,8 @@ public class WorkflowExecuteRunnable implements Callable { * @return ExecutionStatus */ private ExecutionStatus runningState(ExecutionStatus state) { - if (state == ExecutionStatus.READY_STOP - || state == ExecutionStatus.READY_PAUSE - || state == ExecutionStatus.WAITING_THREAD - || state == ExecutionStatus.READY_BLOCK - || state == ExecutionStatus.DELAY_EXECUTION) { + if (state == ExecutionStatus.READY_STOP || state == ExecutionStatus.READY_PAUSE || state == ExecutionStatus.WAITING_THREAD || state == ExecutionStatus.READY_BLOCK || + state == ExecutionStatus.DELAY_EXECUTION) { // if the running task is not completed, the state remains unchanged return state; } else { @@ -1412,9 +1393,7 @@ public class WorkflowExecuteRunnable implements Callable { return true; } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { - return readyToSubmitTaskQueue.size() == 0 - && activeTaskProcessorMaps.size() == 0 - && waitToRetryTaskInstanceMap.size() == 0; + return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0 && waitToRetryTaskInstanceMap.size() == 0; } } return false; @@ -1444,10 +1423,7 @@ public class WorkflowExecuteRunnable implements Callable { } List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); - if (CollectionUtils.isNotEmpty(pauseList) - || processInstance.isBlocked() - || !isComplementEnd() - || readyToSubmitTaskQueue.size() > 0) { + if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) { return ExecutionStatus.PAUSE; } else { return ExecutionStatus.SUCCESS; @@ -1511,10 +1487,7 @@ public class WorkflowExecuteRunnable implements Callable { List stopList = getCompleteTaskByState(ExecutionStatus.STOP); List killList = getCompleteTaskByState(ExecutionStatus.KILL); List failList = getCompleteTaskByState(ExecutionStatus.FAILURE); - if (CollectionUtils.isNotEmpty(stopList) - || CollectionUtils.isNotEmpty(killList) - || CollectionUtils.isNotEmpty(failList) - || !isComplementEnd()) { + if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !isComplementEnd()) { return ExecutionStatus.STOP; } else { return ExecutionStatus.SUCCESS; @@ -1555,7 +1528,7 @@ public class WorkflowExecuteRunnable implements Callable { } Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + Date endTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); return processInstance.getScheduleTime().equals(endTime); } diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index bd05a15f45..8a3f18b555 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -55,24 +55,6 @@ com.cronutils cron-utils - - org.quartz-scheduler - quartz - - - com.mchange - c3p0 - - - com.mchange - mchange-commons-java - - - com.zaxxer - HikariCP-java7 - - - io.micrometer diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java similarity index 99% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java index 8e69c458c4..aa890af99d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.corn; +package org.apache.dolphinscheduler.service.cron; import org.apache.dolphinscheduler.common.enums.CycleEnum; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java similarity index 59% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java index e25569236e..da62ba47ef 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.corn; +package org.apache.dolphinscheduler.service.cron; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.day; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.min; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.month; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.week; -import static org.apache.dolphinscheduler.service.corn.CycleFactory.year; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.day; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.min; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.month; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.week; +import static org.apache.dolphinscheduler.service.cron.CycleFactory.year; import static com.cronutils.model.CronType.QUARTZ; @@ -33,27 +33,32 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; -import java.text.ParseException; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Calendar; -import java.util.Collections; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; -import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cronutils.model.Cron; import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; +import lombok.NonNull; + /** * // todo: this utils is heavy, it rely on quartz and corn-utils. * cron utils @@ -74,19 +79,12 @@ public class CronUtils { * @param cronExpression cron expression, never null * @return Cron instance, corresponding to cron expression received */ - public static Cron parse2Cron(String cronExpression) { - return QUARTZ_CRON_PARSER.parse(cronExpression); - } - - /** - * build a new CronExpression based on the string cronExpression - * - * @param cronExpression String representation of the cron expression the new object should represent - * @return CronExpression - * @throws ParseException if the string expression cannot be parsed into a valid - */ - public static CronExpression parse2CronExpression(String cronExpression) throws ParseException { - return new CronExpression(cronExpression); + public static Cron parse2Cron(String cronExpression) throws CronParseException { + try { + return QUARTZ_CRON_PARSER.parse(cronExpression); + } catch (Exception ex) { + throw new CronParseException(String.format("Parse corn expression: [%s] error", cronExpression), ex); + } } /** @@ -106,7 +104,12 @@ public class CronUtils { * @return CycleEnum */ public static CycleEnum getMiniCycle(Cron cron) { - return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).addCycle(year(cron)).getMiniCycle(); + return min(cron).addCycle(hour(cron)) + .addCycle(day(cron)) + .addCycle(week(cron)) + .addCycle(month(cron)) + .addCycle(year(cron)) + .getMiniCycle(); } /** @@ -116,23 +119,41 @@ public class CronUtils { * @return CycleEnum */ public static CycleEnum getMaxCycle(String crontab) { - return getMaxCycle(parse2Cron(crontab)); + try { + return getMaxCycle(parse2Cron(crontab)); + } catch (CronParseException ex) { + throw new RuntimeException("Get max cycle error", ex); + } + } + + + public static List getFireDateList(@NonNull ZonedDateTime startTime, + @NonNull ZonedDateTime endTime, + @NonNull String cron) throws CronParseException { + return getFireDateList(startTime, endTime, parse2Cron(cron)); } /** * gets all scheduled times for a period of time based on not self dependency * * @param startTime startTime - * @param endTime endTime - * @param cronExpression cronExpression + * @param endTime endTime + * @param cron cron * @return date list */ - public static List getFireDateList(Date startTime, Date endTime, CronExpression cronExpression) { - List dateList = new ArrayList<>(); + public static List getFireDateList(@NonNull ZonedDateTime startTime, + @NonNull ZonedDateTime endTime, + @NonNull Cron cron) { + List dateList = new ArrayList<>(); + ExecutionTime executionTime = ExecutionTime.forCron(cron); while (Stopper.isRunning()) { - startTime = cronExpression.getNextValidTimeAfter(startTime); - if (startTime == null || startTime.after(endTime)) { + Optional nextExecutionTimeOptional = executionTime.nextExecution(startTime); + if (!nextExecutionTimeOptional.isPresent()) { + break; + } + startTime = nextExecutionTimeOptional.get(); + if (startTime.isAfter(endTime)) { break; } dateList.add(startTime); @@ -142,64 +163,62 @@ public class CronUtils { } /** - * gets expect scheduled times for a period of time based on self dependency + * Gets expect scheduled times for a period of time based on self dependency * * @param startTime startTime - * @param endTime endTime - * @param cronExpression cronExpression + * @param endTime endTime + * @param cron cron * @param fireTimes fireTimes - * @return date list + * @return nextTime execution list */ - public static List getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression, int fireTimes) { - List dateList = new ArrayList<>(); + public static List getSelfFireDateList(@NonNull ZonedDateTime startTime, + @NonNull ZonedDateTime endTime, @NonNull Cron cron, + int fireTimes) { + List executeTimes = new ArrayList<>(); + ExecutionTime executionTime = ExecutionTime.forCron(cron); while (fireTimes > 0) { - startTime = cronExpression.getNextValidTimeAfter(startTime); - if (startTime == null || startTime.after(endTime) || startTime.equals(endTime)) { + Optional nextTime = executionTime.nextExecution(startTime); + if (!nextTime.isPresent()) { break; } - dateList.add(startTime); + startTime = nextTime.get(); + if (startTime.isAfter(endTime)) { + break; + } + executeTimes.add(startTime); fireTimes--; } - - return dateList; + return executeTimes; } - /** - * gets all scheduled times for a period of time based on self dependency - * - * @param startTime startTime - * @param endTime endTime - * @param cronExpression cronExpression - * @return date list - */ - public static List getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression) { - List dateList = new ArrayList<>(); - - while (Stopper.isRunning()) { - startTime = cronExpression.getNextValidTimeAfter(startTime); - if (startTime == null || startTime.after(endTime) || startTime.equals(endTime)) { - break; - } - dateList.add(startTime); - } + public static List getSelfFireDateList(@NonNull final Date startTime, + @NonNull final Date endTime, + @NonNull final List schedules) throws CronParseException { + ZonedDateTime zonedDateTimeStart = ZonedDateTime.ofInstant(startTime.toInstant(), ZoneId.systemDefault()); + ZonedDateTime zonedDateTimeEnd = ZonedDateTime.ofInstant(endTime.toInstant(), ZoneId.systemDefault()); - return dateList; + return getSelfFireDateList(zonedDateTimeStart, zonedDateTimeEnd, schedules).stream() + .map(zonedDateTime -> new Date(zonedDateTime.toInstant().toEpochMilli())) + .collect(Collectors.toList()); } /** * gets all scheduled times for a period of time based on self dependency * if schedulers is empty then default scheduler = 1 day */ - public static List getSelfFireDateList(final Date startTime, final Date endTime, final List schedules) { - List result = new ArrayList<>(); + public static List getSelfFireDateList(@NonNull final ZonedDateTime startTime, + @NonNull final ZonedDateTime endTime, + @NonNull final List schedules) + throws CronParseException { + List result = new ArrayList<>(); if (startTime.equals(endTime)) { result.add(startTime); return result; } // support left closed and right closed time interval (startDate <= N <= endDate) - Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); - Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS); + ZonedDateTime from = startTime.minusSeconds(1L); + ZonedDateTime to = endTime.plusSeconds(1L); List listSchedule = new ArrayList<>(); listSchedule.addAll(schedules); @@ -209,30 +228,11 @@ public class CronUtils { listSchedule.add(schedule); } for (Schedule schedule : listSchedule) { - result.addAll(CronUtils.getSelfFireDateList(from, to, schedule.getCrontab())); + result.addAll(CronUtils.getFireDateList(from, to, schedule.getCrontab())); } return result; } - /** - * gets all scheduled times for a period of time based on self dependency - * - * @param startTime startTime - * @param endTime endTime - * @param cron cron - * @return date list - */ - public static List getSelfFireDateList(Date startTime, Date endTime, String cron) { - CronExpression cronExpression = null; - try { - cronExpression = parse2CronExpression(cron); - } catch (ParseException e) { - logger.error(e.getMessage(), e); - return Collections.emptyList(); - } - return getSelfFireDateList(startTime, endTime, cronExpression); - } - /** * get expiration time * @@ -289,8 +289,9 @@ public class CronUtils { /** * get Schedule Date + * * @param param - * @return date list + * @return date list */ public static List getSelfScheduleDateList(Map param) { List result = new ArrayList<>(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java similarity index 99% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java index 1a133ee7ea..00ca1029c6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.service.corn; +package org.apache.dolphinscheduler.service.cron; import com.cronutils.model.Cron; import com.cronutils.model.field.expression.Always; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java similarity index 97% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java index 7cc4a87f07..d4011bcd74 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.service.corn; +package org.apache.dolphinscheduler.service.cron; import com.cronutils.model.Cron; import org.apache.dolphinscheduler.common.enums.CycleEnum; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java new file mode 100644 index 0000000000..cfc46d7572 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.exceptions; + +public class CronParseException extends Exception { + + public CronParseException(String message) { + super(message); + } + + public CronParseException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6558940673..f6eab1befa 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.util.Date; @@ -61,7 +62,7 @@ import org.springframework.transaction.annotation.Transactional; public interface ProcessService { @Transactional - ProcessInstance handleCommand(String host, Command command); + ProcessInstance handleCommand(String host, Command command) throws CronParseException; void moveToErrorCommand(Command command, String message); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 08921e72a7..f9d0782f95 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import io.micrometer.core.annotation.Counted; -import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; @@ -33,6 +31,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; @@ -130,12 +129,14 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.corn.CronUtils; +import org.apache.dolphinscheduler.service.cron.CronUtils; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -152,7 +153,6 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -166,6 +166,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; + /** * process relative dao that some mappers in this. */ @@ -277,14 +279,13 @@ public class ProcessServiceImpl implements ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger * @param host host * @param command found command * @return process instance */ @Override @Transactional - public ProcessInstance handleCommand(String host, Command command) { + public ProcessInstance handleCommand(String host, Command command) throws CronParseException { ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { @@ -731,15 +732,14 @@ public class ProcessServiceImpl implements ProcessService { * @param cmdParam cmdParam map * @return date */ - private Date getScheduleTime(Command command, Map cmdParam) { + private Date getScheduleTime(Command command, Map cmdParam) throws CronParseException { Date scheduleTime = command.getScheduleTime(); - if (scheduleTime == null - && cmdParam != null - && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); - List schedules = queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); + List schedules = + queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); List complementDateList = CronUtils.getSelfFireDateList(start, end, schedules); if (complementDateList.size() > 0) { @@ -922,12 +922,13 @@ public class ProcessServiceImpl implements ProcessService { * @param host host * @return process instance */ - protected ProcessInstance constructProcessInstance(Command command, String host) { + protected ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); - processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); + processDefinition = + this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); if (processDefinition == null) { logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); return null; @@ -1122,7 +1123,7 @@ public class ProcessServiceImpl implements ProcessService { */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, - Map cmdParam) { + Map cmdParam) throws CronParseException { if (!processInstance.isComplementData()) { return; } @@ -1130,16 +1131,16 @@ public class ProcessServiceImpl implements ProcessService { Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); List complementDate = Lists.newLinkedList(); - if(start != null && end != null){ - List listSchedules = queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); + if (start != null && end != null) { + List listSchedules = + queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules); } - if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { complementDate = CronUtils.getSelfScheduleDateList(cmdParam); } - if (complementDate.size() > 0 - && Flag.NO == processInstance.getIsSubProcess()) { + if (complementDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementDate.get(0)); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java index b53062e975..1e2b2548eb 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java @@ -24,9 +24,10 @@ import static com.cronutils.model.field.expression.FieldExpressionFactory.questi import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.service.corn.CronUtils; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; -import java.text.ParseException; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Date; import org.junit.Assert; @@ -59,15 +60,9 @@ public class CronUtilsTest { */ @Test public void testCronAsString() { - Cron cron = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) - .withYear(always()) - .withDoW(questionMark()) - .withMonth(always()) - .withDoM(always()) - .withHour(always()) - .withMinute(every(5)) - .withSecond(on(0)) - .instance(); + Cron cron = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always()) + .withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5)) + .withSecond(on(0)).instance(); // Obtain the string expression String cronAsString = cron.asString(); @@ -78,11 +73,9 @@ public class CronUtilsTest { /** * cron parse test - * - * @throws ParseException if error throws ParseException */ @Test - public void testCronParse() throws ParseException { + public void testCronParse() throws CronParseException { String strCrontab = "0 1 2 3 * ? *"; Cron depCron = CronUtils.parse2Cron(strCrontab); @@ -96,11 +89,9 @@ public class CronUtilsTest { /** * schedule type test - * - * @throws ParseException if error throws ParseException */ @Test - public void testScheduleType() throws ParseException { + public void testScheduleType() throws CronParseException { CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *")); Assert.assertEquals("MINUTE", cycleEnum.name()); @@ -129,23 +120,15 @@ public class CronUtilsTest { * test */ @Test - public void test2() { - Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) - .withYear(always()) - .withDoW(questionMark()) - .withMonth(always()) - .withDoM(always()) - .withHour(always()) - .withMinute(every(5)) - .withSecond(on(0)) - .instance(); + public void test2() throws CronParseException { + Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always()) + .withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5)) + .withSecond(on(0)).instance(); // minute cycle - String[] cronArayy = new String[] {"* * * * * ? *", "* 0 * * * ? *", - "* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *", "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"}; + String[] cronArayy = + new String[] {"* * * * * ? *", "* 0 * * * ? *", "* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *", + "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"}; for (String minCrontab : cronArayy) { - if (!org.quartz.CronExpression.isValidExpression(minCrontab)) { - throw new RuntimeException(minCrontab + " verify failure, cron expression not valid"); - } Cron cron = CronUtils.parse2Cron(minCrontab); CronField minField = cron.retrieve(CronFieldName.MINUTE); logger.info("minField instanceof Between:" + (minField.getExpression() instanceof Between)); @@ -166,7 +149,8 @@ public class CronUtilsTest { logger.info("dayOfMonthField instanceof Every:" + (dayOfMonthField.getExpression() instanceof Every)); logger.info("dayOfMonthField instanceof On:" + (dayOfMonthField.getExpression() instanceof On)); logger.info("dayOfMonthField instanceof And:" + (dayOfMonthField.getExpression() instanceof And)); - logger.info("dayOfMonthField instanceof QuestionMark:" + (dayOfMonthField.getExpression() instanceof QuestionMark)); + logger.info( + "dayOfMonthField instanceof QuestionMark:" + (dayOfMonthField.getExpression() instanceof QuestionMark)); CronField monthField = cron.retrieve(CronFieldName.MONTH); logger.info("monthField instanceof Between:" + (monthField.getExpression() instanceof Between)); @@ -182,7 +166,8 @@ public class CronUtilsTest { logger.info("dayOfWeekField instanceof Every:" + (dayOfWeekField.getExpression() instanceof Every)); logger.info("dayOfWeekField instanceof On:" + (dayOfWeekField.getExpression() instanceof On)); logger.info("dayOfWeekField instanceof And:" + (dayOfWeekField.getExpression() instanceof And)); - logger.info("dayOfWeekField instanceof QuestionMark:" + (dayOfWeekField.getExpression() instanceof QuestionMark)); + logger.info( + "dayOfWeekField instanceof QuestionMark:" + (dayOfWeekField.getExpression() instanceof QuestionMark)); CronField yearField = cron.retrieve(CronFieldName.YEAR); logger.info("yearField instanceof Between:" + (yearField.getExpression() instanceof Between)); @@ -203,27 +188,39 @@ public class CronUtilsTest { } @Test - public void getSelfFireDateList() throws ParseException { - Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); - Date to = DateUtils.stringToDate("2020-01-31 00:00:00"); + public void getSelfFireDateList() throws CronParseException { + ZonedDateTime from = + ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:00:00").toInstant(), ZoneId.systemDefault()); + ZonedDateTime to = + ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-31 00:00:00").toInstant(), ZoneId.systemDefault()); // test date - Assert.assertEquals(0, CronUtils.getSelfFireDateList(to, from, "0 0 0 * * ? ").size()); - // test error cron - Assert.assertEquals(0, CronUtils.getSelfFireDateList(from, to, "0 0 0 * *").size()); + Assert.assertEquals(0, CronUtils.getFireDateList(to, from, "0 0 0 * * ? ").size()); + try { + // test error cron + // should throw exception + CronUtils.getFireDateList(from, to, "0 0 0 * *").size(); + Assert.assertTrue(false); + } catch (CronParseException cronParseException) { + Assert.assertTrue(true); + } // test cron - Assert.assertEquals(29, CronUtils.getSelfFireDateList(from, to, "0 0 0 * * ? ").size()); + Assert.assertEquals(30, CronUtils.getFireDateList(from, to, "0 0 0 * * ? ").size()); // test other - Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size()); - Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size()); - from = DateUtils.stringToDate("2020-01-01 00:02:00"); - to = DateUtils.stringToDate("2020-01-01 00:02:00"); - Assert.assertEquals(1, CronUtils.getFireDateList(new Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ? ")).size()); - - from = DateUtils.stringToDate("2020-01-01 00:02:00"); - to = DateUtils.stringToDate("2020-01-01 00:04:00"); - Assert.assertEquals(2, CronUtils.getFireDateList(new Date(from.getTime() - 1000), - new Date(to.getTime() - 1000), - CronUtils.parse2CronExpression("0 * * * * ? ")).size()); + Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2Cron("0 0 0 * * ? ")).size()); + Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2Cron("0 0 0 * * ? "), 5).size()); + from = + ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault()); + to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault()); + Assert.assertEquals(1, + CronUtils.getFireDateList(from.minusSeconds(1L), to, CronUtils.parse2Cron("0 * * * * ? ")).size()); + + from = + ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault()); + to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:04:00").toInstant(), + ZoneId.systemDefault()); + Assert.assertEquals(2, + CronUtils.getFireDateList(from.minusSeconds(1L), to.minusSeconds(1L), CronUtils.parse2Cron("0 * * * * ? ")) + .size()); } @Test diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 2e65a0cb5b..26b9e66045 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -284,7 +285,7 @@ public class ProcessServiceTest { } @Test - public void testHandleCommand() { + public void testHandleCommand() throws CronParseException { //cannot construct process instance, return null; String host = "127.0.0.1"; @@ -461,7 +462,7 @@ public class ProcessServiceTest { } @Test(expected = ServiceException.class) - public void testDeleteNotExistCommand() { + public void testDeleteNotExistCommand() throws CronParseException { String host = "127.0.0.1"; int definitionVersion = 1; long definitionCode = 123; diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml index 9ef5e94cd0..45a9890738 100644 --- a/dolphinscheduler-worker/pom.xml +++ b/dolphinscheduler-worker/pom.xml @@ -34,12 +34,6 @@ org.apache.dolphinscheduler dolphinscheduler-service - - - org.springframework.boot - spring-boot-starter-quartz - - org.apache.dolphinscheduler