Browse Source

Remove quartz in service (#10748)

* Remove quartz in service
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
426567348e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
  5. 282
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  6. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  7. 77
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  8. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java
  9. 59
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
  10. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java
  11. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java
  12. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  13. 43
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  14. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
  15. 83
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  16. 18
      dolphinscheduler-service/pom.xml
  17. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java
  18. 161
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
  19. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java
  20. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java
  21. 29
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java
  22. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  23. 41
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  24. 99
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java
  25. 5
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  26. 6
      dolphinscheduler-worker/pom.xml

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java

@ -193,7 +193,7 @@ public class BaseServiceImpl implements BaseService {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Date start = null; Date start = null;
if (!StringUtils.isEmpty(startDateStr)) { if (!StringUtils.isEmpty(startDateStr)) {
start = DateUtils.getScheduleDate(startDateStr); start = DateUtils.stringToDate(startDateStr);
if (Objects.isNull(start)) { if (Objects.isNull(start)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result; return result;
@ -203,7 +203,7 @@ public class BaseServiceImpl implements BaseService {
Date end = null; Date end = null;
if (!StringUtils.isEmpty(endDateStr)) { if (!StringUtils.isEmpty(endDateStr)) {
end = DateUtils.getScheduleDate(endDateStr); end = DateUtils.stringToDate(endDateStr);
if (Objects.isNull(end)) { if (Objects.isNull(end)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result; return result;

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java

@ -166,8 +166,8 @@ public class DataAnalysisServiceImpl extends BaseServiceImpl implements DataAnal
Date start = null; Date start = null;
Date end = null; Date end = null;
if (!StringUtils.isEmpty(startDate) && !StringUtils.isEmpty(endDate)) { if (!StringUtils.isEmpty(startDate) && !StringUtils.isEmpty(endDate)) {
start = DateUtils.getScheduleDate(startDate); start = DateUtils.stringToDate(startDate);
end = DateUtils.getScheduleDate(endDate); end = DateUtils.stringToDate(endDate);
if (Objects.isNull(start) || Objects.isNull(end)) { if (Objects.isNull(start) || Objects.isNull(end)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result; return result;

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java

@ -66,10 +66,10 @@ public class DqExecuteResultServiceImpl extends BaseServiceImpl implements DqExe
Date end = null; Date end = null;
try { try {
if (StringUtils.isNotEmpty(startTime)) { if (StringUtils.isNotEmpty(startTime)) {
start = DateUtils.getScheduleDate(startTime); start = DateUtils.stringToDate(startTime);
} }
if (StringUtils.isNotEmpty(endTime)) { if (StringUtils.isNotEmpty(endTime)) {
end = DateUtils.getScheduleDate(endTime); end = DateUtils.stringToDate(endTime);
} }
} catch (Exception e) { } catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime");

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java

@ -163,10 +163,10 @@ public class DqRuleServiceImpl extends BaseServiceImpl implements DqRuleService
Date end = null; Date end = null;
try { try {
if (StringUtils.isNotEmpty(startTime)) { if (StringUtils.isNotEmpty(startTime)) {
start = DateUtils.getScheduleDate(startTime); start = DateUtils.stringToDate(startTime);
} }
if (StringUtils.isNotEmpty(endTime)) { if (StringUtils.isNotEmpty(endTime)) {
end = DateUtils.getScheduleDate(endTime); end = DateUtils.stringToDate(endTime);
} }
} catch (Exception e) { } catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime");

282
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -17,12 +17,17 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import com.fasterxml.jackson.core.type.TypeReference; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
import com.google.common.collect.Lists; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import org.apache.commons.beanutils.BeanUtils; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import org.apache.commons.collections.CollectionUtils; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import org.apache.commons.collections.MapUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import org.apache.commons.lang3.StringUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -66,16 +71,18 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -83,16 +90,13 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; import org.slf4j.Logger;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import org.springframework.beans.factory.annotation.Autowired;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import org.springframework.stereotype.Service;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; import com.fasterxml.jackson.core.type.TypeReference;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LENGTH;
/** /**
* executor service impl * executor service impl
@ -155,18 +159,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return execute process instance code * @return execute process instance code
*/ */
@Override @Override
public Map<String, Object> execProcessInstance(User loginUser, long projectCode, public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode,
long processDefinitionCode, String cronTime, CommandType commandType, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId, TaskDependType taskDependType, WarningType warningType,
RunMode runMode, int warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout, Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber, Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, int dryRun, ComplementDependentMode complementDependentMode) {
ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START); Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_START);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
@ -190,7 +195,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result; return result;
} }
if(!checkScheduleTimeNum(commandType,cronTime)){ if (!checkScheduleTimeNum(commandType, cronTime)) {
putMsg(result, Status.SCHEDULE_TIME_NUMBER); putMsg(result, Status.SCHEDULE_TIME_NUMBER);
return result; return result;
} }
@ -202,10 +207,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/** /**
* create command * create command
*/ */
int create = this.createCommand(commandType, processDefinition.getCode(), int create =
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList,
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup,
expectedParallelismNumber, dryRun, complementDependentMode); environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode);
if (create > 0) { if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId); processDefinition.setWarningGroupId(warningGroupId);
@ -236,19 +241,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} }
/** /**
*
* @param complementData * @param complementData
* @param cronTime * @param cronTime
* @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false
*/ */
private boolean checkScheduleTimeNum(CommandType complementData,String cronTime) { private boolean checkScheduleTimeNum(CommandType complementData, String cronTime) {
if (!CommandType.COMPLEMENT_DATA.equals(complementData)) { if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
return true; return true;
} }
if(cronTime == null){ if (cronTime == null) {
return true; return true;
} }
Map<String,String> cronMap = JSONUtils.toMap(cronTime); Map<String, String> cronMap = JSONUtils.toMap(cronTime);
if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
@ -268,7 +272,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return check result code * @return check result code
*/ */
@Override @Override
public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) { public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition,
long processDefineCode, Integer version) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists // check process definition exists
@ -287,31 +292,37 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/** /**
* check if the current process has subprocesses and all subprocesses are valid * check if the current process has subprocesses and all subprocesses are valid
*
* @param processDefinition * @param processDefinition
* @return check result * @return check result
*/ */
@Override @Override
public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) { public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) {
// query all subprocesses under the current process // query all subprocesses under the current process
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
if (processTaskRelations.isEmpty()) { if (processTaskRelations.isEmpty()) {
return true; return true;
} }
Set<Long> relationCodes = processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet()); Set<Long> relationCodes =
processTaskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toSet());
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(relationCodes); List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(relationCodes);
// find out the process definition code // find out the process definition code
Set<Long> processDefinitionCodeSet = new HashSet<>(); Set<Long> processDefinitionCodeSet = new HashSet<>();
taskDefinitions.stream() taskDefinitions.stream()
.filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())) .filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach(
.forEach(taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)))); taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(
JSONUtils.getNodeString(taskDefinition.getTaskParams(), Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));
if (processDefinitionCodeSet.isEmpty()) { if (processDefinitionCodeSet.isEmpty()) {
return true; return true;
} }
// check sub releaseState // check sub releaseState
List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryByCodes(processDefinitionCodeSet); List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryByCodes(processDefinitionCodeSet);
return processDefinitions.stream().filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet()).isEmpty(); return processDefinitions.stream()
.filter(definition -> definition.getReleaseState().equals(ReleaseState.OFFLINE)).collect(Collectors.toSet())
.isEmpty();
} }
@ -329,7 +340,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, ApiFuncIdentificationConstant.map.get(executeType)); Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.map.get(executeType));
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
@ -345,10 +357,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result; return result;
} }
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()); processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); result =
checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
@ -365,7 +380,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} }
//get the startParams user specified at the first starting while repeat running is needed //get the startParams user specified at the first starting while repeat running is needed
Map<String, Object> commandMap = JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {}); Map<String, Object> commandMap =
JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
String startParams = null; String startParams = null;
if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS); Object startParamsJson = commandMap.get(Constants.CMD_PARAM_START_PARAMS);
@ -376,19 +393,24 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) { switch (executeType) {
case REPEAT_RUNNING: case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
break; break;
case RECOVER_SUSPENDED_PROCESS: case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
break; break;
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
break; break;
case STOP: case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) { if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else { } else {
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); result =
updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
} }
break; break;
case PAUSE: case PAUSE:
@ -432,8 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return true if tenant suitable, otherwise return false * @return true if tenant suitable, otherwise return false
*/ */
private boolean checkTenantSuitable(ProcessDefinition processDefinition) { private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(), Tenant tenant =
processDefinition.getUserId()); processService.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId());
return tenant != null; return tenant != null;
} }
@ -475,7 +497,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break; break;
} }
if (!checkResult) { if (!checkResult) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString()); putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(),
executionStatus.toString(), executeType.toString());
} else { } else {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }
@ -490,7 +513,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param executionStatus execute status * @param executionStatus execute status
* @return update result * @return update result
*/ */
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType,
ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
processInstance.setCommandType(commandType); processInstance.setCommandType(commandType);
@ -528,8 +552,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
taskGroupQueue.setForceStart(Flag.YES.getCode()); taskGroupQueue.setForceStart(Flag.YES.getCode());
processService.updateTaskGroupQueue(taskGroupQueue); processService.updateTaskGroupQueue(taskGroupQueue);
processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId() processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId(),
,org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST); org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
@ -544,7 +568,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param commandType command type * @param commandType command type
* @return insert result code * @return insert result code
*/ */
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
int processVersion, CommandType commandType, String startParams) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
//To add startParams only when repeat running is needed //To add startParams only when repeat running is needed
@ -607,8 +632,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/ */
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) { if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
logger.info("not release process definition id: {} , name : {}", logger.info("not release process definition id: {} , name : {}", processDefinitionTmp.getId(),
processDefinitionTmp.getId(), processDefinitionTmp.getName()); processDefinitionTmp.getName());
return result; return result;
} }
} }
@ -636,12 +661,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param environmentCode environmentCode * @param environmentCode environmentCode
* @return command id * @return command id
*/ */
private int createCommand(CommandType commandType, long processDefineCode, private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep,
TaskDependType nodeDep, FailureStrategy failureStrategy, FailureStrategy failureStrategy, String startNodeList, String schedule,
String startNodeList, String schedule, WarningType warningType, WarningType warningType, int executorId, int warningGroupId, RunMode runMode,
int executorId, int warningGroupId, Priority processInstancePriority, String workerGroup, Long environmentCode,
RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) { ComplementDependentMode complementDependentMode) {
/** /**
* instantiate command schedule instance * instantiate command schedule instance
@ -689,11 +714,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (schedule == null || StringUtils.isEmpty(schedule)) { if (schedule == null || StringUtils.isEmpty(schedule)) {
return 0; return 0;
} }
int check = checkScheduleTime(schedule); if (!isValidateScheduleTime(schedule)) {
if(check == 0){ return 0;
}
try {
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
complementDependentMode);
} catch (CronParseException cronParseException) {
// this just make compile happy, since we already validate the cron before
logger.error("Parse cron error", cronParseException);
return 0; return 0;
} }
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode);
} else { } else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command); return processService.createCommand(command);
@ -709,7 +740,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return * @return
*/ */
protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command, protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command,
Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) { Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode)
throws CronParseException {
int createCount = 0; int createCount = 0;
String startDate = null; String startDate = null;
String endDate = null; String endDate = null;
@ -718,29 +751,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam); Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); dateList = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
dateList = removeDuplicates(dateList); dateList = removeDuplicates(dateList);
} }
if(scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){ if (scheduleParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey(
CMDPARAM_COMPLEMENT_DATA_END_DATE)) {
startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); startDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); endDate = scheduleParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
} }
switch (runMode) { switch (runMode) {
case RUN_MODE_SERIAL: { case RUN_MODE_SERIAL: {
if(StringUtils.isNotEmpty(dateList)){ if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command); createCount = processService.createCommand(command);
} }
if(startDate != null && endDate != null){ if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command); createCount = processService.createCommand(command);
// dependent process definition // dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
@ -752,10 +787,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break; break;
} }
case RUN_MODE_PARALLEL: { case RUN_MODE_PARALLEL: {
if(startDate != null && endDate != null){ if (startDate != null && endDate != null) {
List<Date> listDate = new ArrayList<>(); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(DateUtils.getScheduleDate(startDate), DateUtils.getScheduleDate(endDate), schedules)); List<ZonedDateTime> listDate = new ArrayList<>(
CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate), schedules));
int listDateSize = listDate.size(); int listDateSize = listDate.size();
createCount = listDate.size(); createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) { if (!CollectionUtils.isEmpty(listDate)) {
@ -791,15 +828,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processService.createCommand(command); processService.createCommand(command);
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip " logger.info(
"process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ "dependent complement data", command.getProcessDefinitionCode()); + "dependent complement data", command.getProcessDefinitionCode());
} else { } else {
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
} }
} }
} }
} }
if(StringUtils.isNotEmpty(dateList)){ if (StringUtils.isNotEmpty(dateList)) {
List<String> listDate = Arrays.asList(dateList.split(COMMA)); List<String> listDate = Arrays.asList(dateList.split(COMMA));
int listDateSize = listDate.size(); int listDateSize = listDate.size();
createCount = listDate.size(); createCount = listDate.size();
@ -823,8 +862,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
default: default:
break; break;
} }
logger.info("create complement command count: {}, create dependent complement command count: {}", createCount logger.info("create complement command count: {}, create dependent complement command count: {}", createCount,
, dependentProcessDefinitionCreateCount); dependentProcessDefinitionCreateCount);
return createCount; return createCount;
} }
@ -844,8 +883,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList = List<DependentProcessDefinition> dependentProcessDefinitionList =
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
CronUtils.getMaxCycle(schedules.get(0).getCrontab()), CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
dependentCommand.getWorkerGroup());
dependentCommand.setTaskDependType(TaskDependType.TASK_POST); dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
@ -869,7 +907,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList = List<DependentProcessDefinition> dependentProcessDefinitionList =
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup); return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
workerGroup);
} }
/** /**
@ -877,20 +916,22 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* the dependent process definition and if there is no worker group in the schedule, use the complement selection's * the dependent process definition and if there is no worker group in the schedule, use the complement selection's
* worker group * worker group
*/ */
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList, private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
CycleEnum processDefinitionCycle, List<DependentProcessDefinition> dependentProcessDefinitionList, CycleEnum processDefinitionCycle,
String workerGroup) { String workerGroup) {
List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>(); List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
List<Long> processDefinitionCodeList = dependentProcessDefinitionList.stream() List<Long> processDefinitionCodeList =
.map(DependentProcessDefinition::getProcessDefinitionCode) dependentProcessDefinitionList.stream().map(DependentProcessDefinition::getProcessDefinitionCode)
.collect(Collectors.toList()); .collect(Collectors.toList());
Map<Long, String> processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); Map<Long, String> processDefinitionWorkerGroupMap =
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode())
== null) {
dependentProcessDefinition.setWorkerGroup(workerGroup); dependentProcessDefinition.setWorkerGroup(workerGroup);
} }
@ -902,52 +943,51 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} }
/** /**
*
* @param schedule * @param schedule
* @return check error return 0 otherwish 1 * @return check error return 0 otherwish 1
*/ */
private int checkScheduleTime(String schedule){ private boolean isValidateScheduleTime(String schedule) {
Date start = null; Map<String, String> scheduleResult = JSONUtils.toMap(schedule);
Date end = null; if (scheduleResult == null) {
Map<String,String> scheduleResult = JSONUtils.toMap(schedule); return false;
if(scheduleResult == null){
return 0;
} }
if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
if(scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null){ if (scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) == null) {
return 0; return false;
} }
} }
if(scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)){ if (scheduleResult.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); String startDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); String endDate = scheduleResult.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
if (startDate == null || endDate == null) { if (startDate == null || endDate == null) {
return 0; return false;
} }
start = DateUtils.getScheduleDate(startDate); try {
end = DateUtils.getScheduleDate(endDate); ZonedDateTime start = DateUtils.stringToZoneDateTime(startDate);
if(start == null || end == null){ ZonedDateTime end = DateUtils.stringToZoneDateTime(endDate);
return 0; if (start == null || end == null) {
return false;
} }
if (start.after(end)) { if (start.isAfter(end)) {
logger.error("complement data error, wrong date start:{} and end date:{} ", logger.error("complement data error, wrong date start:{} and end date:{} ", start, end);
start, end return false;
); }
return 0; } catch (Exception ex) {
logger.warn("Parse schedule time error, startDate: {}, endDate: {}", startDate, endDate);
return false;
} }
} }
return 1; return true;
} }
/** /**
*
* @param scheduleTimeList * @param scheduleTimeList
* @return remove duplicate date list * @return remove duplicate date list
*/ */
private String removeDuplicates(String scheduleTimeList){ private String removeDuplicates(String scheduleTimeList) {
HashSet<String> removeDate = new HashSet<String>(); HashSet<String> removeDate = new HashSet<String>();
List<String> resultList = new ArrayList<String>(); List<String> resultList = new ArrayList<String>();
if(StringUtils.isNotEmpty(scheduleTimeList)){ if (StringUtils.isNotEmpty(scheduleTimeList)) {
String[] dateArrays = scheduleTimeList.split(COMMA); String[] dateArrays = scheduleTimeList.split(COMMA);
List<String> dateList = Arrays.asList(dateArrays); List<String> dateList = Arrays.asList(dateArrays);
removeDate.addAll(dateList); removeDate.addAll(dateList);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -560,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) { private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) {
Date schedule = processInstance.getScheduleTime(); Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) { if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime); schedule = DateUtils.stringToDate(scheduleTime);
} }
processInstance.setScheduleTime(schedule); processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class); List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);

77
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -46,28 +46,31 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.text.ParseException; import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.quartz.CronExpression;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.cronutils.model.Cron;
/** /**
* scheduler service impl * scheduler service impl
@ -138,14 +141,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
// check project auth // check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) { if (!hasProjectAndPerm) {
return result; return result;
} }
// check work flow define release state // check work flow define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode, processDefinition.getVersion()); result = executorService.checkProcessDefinitionValid(projectCode, processDefinition, processDefineCode,
processDefinition.getVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
@ -238,7 +242,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
// check project auth // check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) { if (!hasProjectAndPerm) {
return result; return result;
} }
@ -281,7 +285,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
// check project auth // check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) { if (!hasProjectAndPerm) {
return result; return result;
} }
@ -313,8 +317,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (scheduleStatus == ReleaseState.ONLINE) { if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state // check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}", logger.info("not release process definition id: {} , name : {}", processDefinition.getId(),
processDefinition.getId(), processDefinition.getName()); processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result; return result;
} }
@ -332,7 +336,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}", logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName()); subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId())); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
return result; return result;
} }
} }
@ -407,8 +412,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
} }
Page<Schedule> page = new Page<>(pageNo, pageSize); Page<Schedule> page = new Page<>(pageNo, pageSize);
IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, IPage<Schedule> scheduleIPage =
searchVal); scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal);
List<ScheduleVo> scheduleList = new ArrayList<>(); List<ScheduleVo> scheduleList = new ArrayList<>();
for (Schedule schedule : scheduleIPage.getRecords()) { for (Schedule schedule : scheduleIPage.getRecords()) {
@ -436,7 +441,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
// check project auth // check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result,null); boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) { if (!hasProjectAndPerm) {
return result; return result;
} }
@ -502,7 +507,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
Status resultEnum = (Status) checkResult.get(Constants.STATUS); Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) { if (resultEnum != Status.SUCCESS) {
return checkResult; return checkResult;
@ -516,8 +521,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
} }
// Determine if the login user is the owner of the schedule // Determine if the login user is the owner of the schedule
if (loginUser.getId() != schedule.getUserId() if (loginUser.getId() != schedule.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
&& loginUser.getUserType() != UserType.ADMIN_USER) {
putMsg(result, Status.USER_NO_OPERATION_PERM); putMsg(result, Status.USER_NO_OPERATION_PERM);
return result; return result;
} }
@ -548,24 +552,26 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Override @Override
public Map<String, Object> previewSchedule(User loginUser, String schedule) { public Map<String, Object> previewSchedule(User loginUser, String schedule) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
CronExpression cronExpression; Cron cron;
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
Date now = new Date();
Date startTime = DateUtils.transformTimezoneDate(scheduleParam.getStartTime(), scheduleParam.getTimezoneId()); ZoneId zoneId = TimeZone.getTimeZone(scheduleParam.getTimezoneId()).toZoneId();
Date endTime = DateUtils.transformTimezoneDate(scheduleParam.getEndTime(), scheduleParam.getTimezoneId()); ZonedDateTime now = ZonedDateTime.now(zoneId);
startTime = now.after(startTime) ? now : startTime; ZonedDateTime startTime = ZonedDateTime.ofInstant(scheduleParam.getStartTime().toInstant(), zoneId);
ZonedDateTime endTime = ZonedDateTime.ofInstant(scheduleParam.getEndTime().toInstant(), zoneId);
startTime = now.isAfter(startTime) ? now : startTime;
try { try {
cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab()); cron = CronUtils.parse2Cron(scheduleParam.getCrontab());
} catch (ParseException e) { } catch (CronParseException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR); putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return result; return result;
} }
List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT); List<ZonedDateTime> selfFireDateList =
List<String> previewDateList = new ArrayList<>(); CronUtils.getSelfFireDateList(startTime, endTime, cron, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
selfFireDateList.forEach(date -> previewDateList.add(DateUtils.dateToString(date, scheduleParam.getTimezoneId()))); List<String> previewDateList =
selfFireDateList.stream().map(DateUtils::dateToString).collect(Collectors.toList());
result.put(Constants.DATA_LIST, previewDateList); result.put(Constants.DATA_LIST, previewDateList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
@ -598,7 +604,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
long environmentCode) { long environmentCode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
@ -619,17 +625,12 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result; return result;
} }
private void updateSchedule(Map<String, Object> result, private void updateSchedule(Map<String, Object> result, Schedule schedule, ProcessDefinition processDefinition,
Schedule schedule, String scheduleExpression, WarningType warningType, int warningGroupId,
ProcessDefinition processDefinition, FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup,
String scheduleExpression,
WarningType warningType,
int warningGroupId,
FailureStrategy failureStrategy,
Priority processInstancePriority,
String workerGroup,
long environmentCode) { long environmentCode) {
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) { if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE,
Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return; return;
} }

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AuditServiceTest.java

@ -61,8 +61,8 @@ public class AuditServiceTest {
@Test @Test
public void testQueryLogListPaging() { public void testQueryLogListPaging() {
Date start = DateUtils.getScheduleDate("2020-11-01 00:00:00"); Date start = DateUtils.stringToDate("2020-11-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-11-02 00:00:00"); Date end = DateUtils.stringToDate("2020-11-02 00:00:00");
IPage<AuditLog> page = new Page<>(1, 10); IPage<AuditLog> page = new Page<>(1, 10);
page.setRecords(getLists()); page.setRecords(getLists());

59
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@ -17,9 +17,16 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import org.apache.dolphinscheduler.api.dto.CommandStateCount; import org.apache.dolphinscheduler.api.dto.CommandStateCount;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.DataAnalysisServiceImpl; import org.apache.dolphinscheduler.api.service.impl.DataAnalysisServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -39,7 +46,16 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -53,21 +69,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_OVERVIEW;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
/** /**
* data analysis service test * data analysis service test
*/ */
@ -142,10 +143,14 @@ public class DataAnalysisServiceTest {
Mockito.when(projectMapper.queryByCode(1L)).thenReturn(getProject("test")); Mockito.when(projectMapper.queryByCode(1L)).thenReturn(getProject("test"));
//SUCCESS //SUCCESS
Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate), Mockito.when(taskInstanceMapper.countTaskInstanceStateByProjectCodes(DateUtils.stringToDate(startDate),
DateUtils.getScheduleDate(endDate), new Long[]{1L})).thenReturn(getTaskInstanceStateCounts()); DateUtils.stringToDate(endDate),
new Long[] {1L})).thenReturn(getTaskInstanceStateCounts());
Mockito.when(projectMapper.selectById(Mockito.any())).thenReturn(getProject("test")); Mockito.when(projectMapper.selectById(Mockito.any())).thenReturn(getProject("test"));
Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), Mockito.any(), (Map<String, Object>)Mockito.any(),Mockito.any())).thenReturn(true); Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
Mockito.any(),
(Map<String, Object>) Mockito.any(),
Mockito.any())).thenReturn(true);
result = dataAnalysisServiceImpl.countTaskStateByProject(user, 1, startDate, endDate); result = dataAnalysisServiceImpl.countTaskStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
@ -236,18 +241,22 @@ public class DataAnalysisServiceTest {
//checkProject false //checkProject false
Map<String, Object> failResult = new HashMap<>(); Map<String, Object> failResult = new HashMap<>();
putMsg(failResult, Status.PROJECT_NOT_FOUND, 1); putMsg(failResult, Status.PROJECT_NOT_FOUND, 1);
Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(),any())).thenReturn(failResult); Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(), any())).thenReturn(failResult);
failResult = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate); failResult = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, failResult.get(Constants.STATUS)); Assert.assertEquals(Status.PROJECT_NOT_FOUND, failResult.get(Constants.STATUS));
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, null); putMsg(result, Status.SUCCESS, null);
Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(),any())).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(any(), any(), anyLong(), any())).thenReturn(result);
//SUCCESS //SUCCESS
Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.getScheduleDate(startDate), Mockito.when(processInstanceMapper.countInstanceStateByProjectCodes(DateUtils.stringToDate(startDate),
DateUtils.getScheduleDate(endDate), new Long[]{1L})).thenReturn(getTaskInstanceStateCounts()); DateUtils.stringToDate(endDate),
Mockito.when(projectService.hasProjectAndPerm(Mockito.any(), Mockito.any(), (Map<String, Object>)Mockito.any(),Mockito.any())).thenReturn(true); new Long[] {1L})).thenReturn(getTaskInstanceStateCounts());
Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
Mockito.any(),
(Map<String, Object>) Mockito.any(),
Mockito.any())).thenReturn(true);
result = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate); result = dataAnalysisServiceImpl.countProcessInstanceStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqExecuteResultServiceTest.java

@ -73,8 +73,8 @@ public class DqExecuteResultServiceTest {
String searchVal = ""; String searchVal = "";
int ruleType = 0; int ruleType = 0;
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
User loginUser = new User(); User loginUser = new User();
loginUser.setId(1); loginUser.setId(1);

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DqRuleServiceTest.java

@ -128,8 +128,8 @@ public class DqRuleServiceTest {
String searchVal = ""; String searchVal = "";
int ruleType = 0; int ruleType = 0;
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
User loginUser = new User(); User loginUser = new User();
loginUser.setId(1); loginUser.setId(1);

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -171,8 +171,8 @@ public class ProcessInstanceServiceTest {
"192.168.xx.xx", "",1, 10); "192.168.xx.xx", "",1, 10);
Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) proejctAuthFailRes.getCode()); Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) proejctAuthFailRes.getCode());
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>(); List<ProcessInstance> processInstanceList = new ArrayList<>();
Page<ProcessInstance> pageReturn = new Page<>(1, 10); Page<ProcessInstance> pageReturn = new Page<>(1, 10);
@ -246,8 +246,8 @@ public class ProcessInstanceServiceTest {
int size = 10; int size = 10;
String startTime = "2020-01-01 00:00:00"; String startTime = "2020-01-01 00:00:00";
String endTime = "2020-08-02 00:00:00"; String endTime = "2020-08-02 00:00:00";
Date start = DateUtils.getScheduleDate(startTime); Date start = DateUtils.stringToDate(startTime);
Date end = DateUtils.getScheduleDate(endTime); Date end = DateUtils.stringToDate(endTime);
//project auth fail //project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);

43
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -99,22 +100,44 @@ public class TaskInstanceServiceTest {
//project auth fail //project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 0, "", "", Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser,
"test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20); projectCode,
Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int)projectAuthFailRes.getCode()); 0,
"",
"",
"test_user",
"2019-02-26 19:48:00",
"2019-02-26 19:48:22",
"",
null,
"",
1,
20);
Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) projectAuthFailRes.getCode());
// data parameter check // data parameter check
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser,
"test_user", "20200101 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); projectCode,
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int)dataParameterRes.getCode()); 1,
"",
"",
"test_user",
"20200101 00:00:00",
"2020-01-02 00:00:00",
"",
ExecutionStatus.SUCCESS,
"192.168.xx.xx",
1,
20);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode());
//project //project
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
TaskInstance taskInstance = getTaskInstance(); TaskInstance taskInstance = getTaskInstance();
List<TaskInstance> taskInstanceList = new ArrayList<>(); List<TaskInstance> taskInstanceList = new ArrayList<>();
@ -122,7 +145,7 @@ public class TaskInstanceServiceTest {
taskInstanceList.add(taskInstance); taskInstanceList.add(taskInstance);
pageReturn.setRecords(taskInstanceList); pageReturn.setRecords(taskInstanceList);
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""), when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""),

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java

@ -31,6 +31,9 @@ import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.TimeZone; import java.util.TimeZone;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,7 +48,8 @@ public final class DateUtils {
static final long C6 = C5 * 24L; static final long C6 = C5 * 24L;
private static final Logger logger = LoggerFactory.getLogger(DateUtils.class); private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS = DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS); private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS =
DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS);
private DateUtils() { private DateUtils() {
throw new UnsupportedOperationException("Construct DateUtils"); throw new UnsupportedOperationException("Construct DateUtils");
@ -119,7 +123,8 @@ public final class DateUtils {
} }
public static String format(Date date, DateTimeFormatter dateTimeFormatter, String timezone) { public static String format(Date date, DateTimeFormatter dateTimeFormatter, String timezone) {
LocalDateTime localDateTime = StringUtils.isEmpty(timezone) ? date2LocalDateTime(date) : date2LocalDateTime(date, ZoneId.of(timezone)); LocalDateTime localDateTime =
StringUtils.isEmpty(timezone) ? date2LocalDateTime(date) : date2LocalDateTime(date, ZoneId.of(timezone));
return format(localDateTime, dateTimeFormatter); return format(localDateTime, dateTimeFormatter);
} }
@ -159,6 +164,10 @@ public final class DateUtils {
return format(date, YYYY_MM_DD_HH_MM_SS, timezone); return format(date, YYYY_MM_DD_HH_MM_SS, timezone);
} }
public static String dateToString(ZonedDateTime zonedDateTime) {
return YYYY_MM_DD_HH_MM_SS.format(zonedDateTime);
}
/** /**
* convert string to date and time * convert string to date and time
* *
@ -184,16 +193,35 @@ public final class DateUtils {
return null; return null;
} }
public static ZonedDateTime parseZoneDateTime(@Nonnull String date, @Nonnull DateTimeFormatter dateTimeFormatter,
@Nullable String timezone) {
ZonedDateTime zonedDateTime = ZonedDateTime.parse(date, dateTimeFormatter);
if (StringUtils.isNotEmpty(timezone)) {
return zonedDateTime.withZoneSameInstant(ZoneId.of(timezone));
}
return zonedDateTime;
}
/** /**
* convert date str to yyyy-MM-dd HH:mm:ss format * convert date str to yyyy-MM-dd HH:mm:ss format
* *
* @param date date string * @param date date string
* @return yyyy-MM-dd HH:mm:ss format * @return yyyy-MM-dd HH:mm:ss format
*/ */
public static Date stringToDate(String date) { public static @Nullable Date stringToDate(String date) {
return parse(date, YYYY_MM_DD_HH_MM_SS, null); return parse(date, YYYY_MM_DD_HH_MM_SS, null);
} }
public static ZonedDateTime stringToZoneDateTime(@Nonnull String date) {
Date d = stringToDate(date);
if (d == null) {
throw new IllegalArgumentException(String.format(
"data: %s should be a validate data string - yyyy-MM-dd HH:mm:ss ",
date));
}
return ZonedDateTime.ofInstant(d.toInstant(), ZoneId.systemDefault());
}
/** /**
* convert date str to yyyy-MM-dd HH:mm:ss format * convert date str to yyyy-MM-dd HH:mm:ss format
* *
@ -267,16 +295,6 @@ public final class DateUtils {
return future.getTime() > old.getTime(); return future.getTime() > old.getTime();
} }
/**
* convert schedule string to date
*
* @param schedule schedule
* @return convert schedule string to date
*/
public static Date getScheduleDate(String schedule) {
return stringToDate(schedule);
}
/** /**
* format time to readable * format time to readable
* *
@ -554,8 +572,10 @@ public final class DateUtils {
return date; return date;
} }
String dateToString = dateToString(date, sourceTimezoneId); String dateToString = dateToString(date, sourceTimezoneId);
LocalDateTime localDateTime = LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS)); LocalDateTime localDateTime =
ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId()); LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern(Constants.YYYY_MM_DD_HH_MM_SS));
ZonedDateTime zonedDateTime =
ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId());
return Date.from(zonedDateTime.toInstant()); return Date.from(zonedDateTime.toInstant());
} }

83
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -76,7 +76,8 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@ -323,8 +324,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
public String getKey() { public String getKey() {
if (StringUtils.isNotEmpty(key) if (StringUtils.isNotEmpty(key) || this.processDefinition == null) {
|| this.processDefinition == null) {
return key; return key;
} }
@ -443,8 +443,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.stateEvents.add(nextEvent); this.stateEvents.add(nextEvent);
} else { } else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
} }
} }
} }
@ -468,13 +467,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) { if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", processInstance.getId(), newTaskInstance.getTaskCode(),
processInstance.getId(), newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval());
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
newTaskInstance.getRetryTimes(),
newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
} else { } else {
@ -490,8 +484,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public void refreshProcessInstance(int processInstanceId) { public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId); logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId); processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition); processInstance.setProcessDefinition(processDefinition);
} }
@ -612,10 +605,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// complement data ends || no success // complement data ends || no success
return true; return true;
} }
logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", processInstance.getId(), processInstance.getScheduleTime(), complementListDate);
processInstance.getId(),
processInstance.getScheduleTime(),
complementListDate);
scheduleDate = complementListDate.get(index + 1); scheduleDate = complementListDate.get(index + 1);
} }
//the next process complement //the next process complement
@ -663,8 +653,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
private boolean needComplementProcess() { private boolean needComplementProcess() {
if (processInstance.isComplementData() if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
&& Flag.NO == processInstance.getIsSubProcess()) {
return true; return true;
} }
return false; return false;
@ -719,11 +708,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public void checkSerialProcess(ProcessDefinition processDefinition) { public void checkSerialProcess(ProcessDefinition processDefinition) {
int nextInstanceId = processInstance.getNextProcessInstanceId(); int nextInstanceId = processInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) { if (nextInstanceId == 0) {
ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId()); ProcessInstance nextProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId());
if (nextProcessInstance == null) { if (nextProcessInstance == null) {
return; return;
} }
ProcessInstance nextReadyStopProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId()); ProcessInstance nextReadyStopProcessInstance =
this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) { if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) {
return; return;
} }
@ -753,8 +744,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (this.dag != null) { if (this.dag != null) {
return; return;
} }
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition); processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam()); List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam());
@ -773,8 +763,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// generate process to get DAG info // generate process to get DAG info
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList, ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
if (processDag == null) { if (processDag == null) {
logger.error("processDag is null"); logger.error("processDag is null");
return; return;
@ -786,7 +775,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* init task queue * init task queue
*/ */
private void initTaskQueue() throws StateEventHandleException { private void initTaskQueue() throws StateEventHandleException, CronParseException {
taskFailedSubmit = false; taskFailedSubmit = false;
activeTaskProcessorMaps.clear(); activeTaskProcessorMaps.clear();
@ -856,15 +845,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
} }
logger.info(" process definition code:{} complement data: {}", logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate);
processInstance.getProcessDefinitionCode(), complementListDate);
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0)); processInstance.setScheduleTime(complementListDate.get(0));
String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(), String globalParams =
processDefinition.getGlobalParamMap(), curingParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA,
processDefinition.getGlobalParamList(), processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams); processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
} }
@ -887,16 +874,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance); taskProcessor.init(taskInstance, processInstance);
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance); notifyProcessHostUpdate(taskInstance);
} }
boolean submit = taskProcessor.action(TaskAction.SUBMIT); boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) { if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return Optional.empty(); return Optional.empty();
} }
@ -1373,11 +1357,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @return ExecutionStatus * @return ExecutionStatus
*/ */
private ExecutionStatus runningState(ExecutionStatus state) { private ExecutionStatus runningState(ExecutionStatus state) {
if (state == ExecutionStatus.READY_STOP if (state == ExecutionStatus.READY_STOP || state == ExecutionStatus.READY_PAUSE || state == ExecutionStatus.WAITING_THREAD || state == ExecutionStatus.READY_BLOCK ||
|| state == ExecutionStatus.READY_PAUSE state == ExecutionStatus.DELAY_EXECUTION) {
|| state == ExecutionStatus.WAITING_THREAD
|| state == ExecutionStatus.READY_BLOCK
|| state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains unchanged // if the running task is not completed, the state remains unchanged
return state; return state;
} else { } else {
@ -1412,9 +1393,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true; return true;
} }
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskQueue.size() == 0 return readyToSubmitTaskQueue.size() == 0 && activeTaskProcessorMaps.size() == 0 && waitToRetryTaskInstanceMap.size() == 0;
&& activeTaskProcessorMaps.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
} }
} }
return false; return false;
@ -1444,10 +1423,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList) if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) {
|| processInstance.isBlocked()
|| !isComplementEnd()
|| readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE; return ExecutionStatus.PAUSE;
} else { } else {
return ExecutionStatus.SUCCESS; return ExecutionStatus.SUCCESS;
@ -1511,10 +1487,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP); List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL); List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE); List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE);
if (CollectionUtils.isNotEmpty(stopList) if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !isComplementEnd()) {
|| CollectionUtils.isNotEmpty(killList)
|| CollectionUtils.isNotEmpty(failList)
|| !isComplementEnd()) {
return ExecutionStatus.STOP; return ExecutionStatus.STOP;
} else { } else {
return ExecutionStatus.SUCCESS; return ExecutionStatus.SUCCESS;
@ -1555,7 +1528,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); Date endTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
return processInstance.getScheduleTime().equals(endTime); return processInstance.getScheduleTime().equals(endTime);
} }

18
dolphinscheduler-service/pom.xml

@ -55,24 +55,6 @@
<groupId>com.cronutils</groupId> <groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId> <artifactId>cron-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>com.mchange</groupId>
<artifactId>mchange-commons-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/AbstractCycle.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/AbstractCycle.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.corn; package org.apache.dolphinscheduler.service.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;

161
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CronUtils.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java

@ -15,16 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.corn; package org.apache.dolphinscheduler.service.cron;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.COMMA; import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.day; import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.hour; import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.min; import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.month; import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.week; import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
import static org.apache.dolphinscheduler.service.corn.CycleFactory.year; import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
import static com.cronutils.model.CronType.QUARTZ; import static com.cronutils.model.CronType.QUARTZ;
@ -33,27 +33,32 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.text.ParseException; import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.GregorianCalendar; import java.util.GregorianCalendar;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.quartz.CronExpression;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser; import com.cronutils.parser.CronParser;
import lombok.NonNull;
/** /**
* // todo: this utils is heavy, it rely on quartz and corn-utils. * // todo: this utils is heavy, it rely on quartz and corn-utils.
* cron utils * cron utils
@ -74,19 +79,12 @@ public class CronUtils {
* @param cronExpression cron expression, never null * @param cronExpression cron expression, never null
* @return Cron instance, corresponding to cron expression received * @return Cron instance, corresponding to cron expression received
*/ */
public static Cron parse2Cron(String cronExpression) { public static Cron parse2Cron(String cronExpression) throws CronParseException {
try {
return QUARTZ_CRON_PARSER.parse(cronExpression); return QUARTZ_CRON_PARSER.parse(cronExpression);
} catch (Exception ex) {
throw new CronParseException(String.format("Parse corn expression: [%s] error", cronExpression), ex);
} }
/**
* build a new CronExpression based on the string cronExpression
*
* @param cronExpression String representation of the cron expression the new object should represent
* @return CronExpression
* @throws ParseException if the string expression cannot be parsed into a valid
*/
public static CronExpression parse2CronExpression(String cronExpression) throws ParseException {
return new CronExpression(cronExpression);
} }
/** /**
@ -106,7 +104,12 @@ public class CronUtils {
* @return CycleEnum * @return CycleEnum
*/ */
public static CycleEnum getMiniCycle(Cron cron) { public static CycleEnum getMiniCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).addCycle(year(cron)).getMiniCycle(); return min(cron).addCycle(hour(cron))
.addCycle(day(cron))
.addCycle(week(cron))
.addCycle(month(cron))
.addCycle(year(cron))
.getMiniCycle();
} }
/** /**
@ -116,7 +119,18 @@ public class CronUtils {
* @return CycleEnum * @return CycleEnum
*/ */
public static CycleEnum getMaxCycle(String crontab) { public static CycleEnum getMaxCycle(String crontab) {
try {
return getMaxCycle(parse2Cron(crontab)); return getMaxCycle(parse2Cron(crontab));
} catch (CronParseException ex) {
throw new RuntimeException("Get max cycle error", ex);
}
}
public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime startTime,
@NonNull ZonedDateTime endTime,
@NonNull String cron) throws CronParseException {
return getFireDateList(startTime, endTime, parse2Cron(cron));
} }
/** /**
@ -124,15 +138,22 @@ public class CronUtils {
* *
* @param startTime startTime * @param startTime startTime
* @param endTime endTime * @param endTime endTime
* @param cronExpression cronExpression * @param cron cron
* @return date list * @return date list
*/ */
public static List<Date> getFireDateList(Date startTime, Date endTime, CronExpression cronExpression) { public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime startTime,
List<Date> dateList = new ArrayList<>(); @NonNull ZonedDateTime endTime,
@NonNull Cron cron) {
List<ZonedDateTime> dateList = new ArrayList<>();
ExecutionTime executionTime = ExecutionTime.forCron(cron);
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime); Optional<ZonedDateTime> nextExecutionTimeOptional = executionTime.nextExecution(startTime);
if (startTime == null || startTime.after(endTime)) { if (!nextExecutionTimeOptional.isPresent()) {
break;
}
startTime = nextExecutionTimeOptional.get();
if (startTime.isAfter(endTime)) {
break; break;
} }
dateList.add(startTime); dateList.add(startTime);
@ -142,64 +163,62 @@ public class CronUtils {
} }
/** /**
* gets expect scheduled times for a period of time based on self dependency * Gets expect scheduled times for a period of time based on self dependency
* *
* @param startTime startTime * @param startTime startTime
* @param endTime endTime * @param endTime endTime
* @param cronExpression cronExpression * @param cron cron
* @param fireTimes fireTimes * @param fireTimes fireTimes
* @return date list * @return nextTime execution list
*/ */
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression, int fireTimes) { public static List<ZonedDateTime> getSelfFireDateList(@NonNull ZonedDateTime startTime,
List<Date> dateList = new ArrayList<>(); @NonNull ZonedDateTime endTime, @NonNull Cron cron,
int fireTimes) {
List<ZonedDateTime> executeTimes = new ArrayList<>();
ExecutionTime executionTime = ExecutionTime.forCron(cron);
while (fireTimes > 0) { while (fireTimes > 0) {
startTime = cronExpression.getNextValidTimeAfter(startTime); Optional<ZonedDateTime> nextTime = executionTime.nextExecution(startTime);
if (startTime == null || startTime.after(endTime) || startTime.equals(endTime)) { if (!nextTime.isPresent()) {
break; break;
} }
dateList.add(startTime); startTime = nextTime.get();
if (startTime.isAfter(endTime)) {
break;
}
executeTimes.add(startTime);
fireTimes--; fireTimes--;
} }
return executeTimes;
return dateList;
} }
/** public static List<Date> getSelfFireDateList(@NonNull final Date startTime,
* gets all scheduled times for a period of time based on self dependency @NonNull final Date endTime,
* @NonNull final List<Schedule> schedules) throws CronParseException {
* @param startTime startTime ZonedDateTime zonedDateTimeStart = ZonedDateTime.ofInstant(startTime.toInstant(), ZoneId.systemDefault());
* @param endTime endTime ZonedDateTime zonedDateTimeEnd = ZonedDateTime.ofInstant(endTime.toInstant(), ZoneId.systemDefault());
* @param cronExpression cronExpression
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, CronExpression cronExpression) {
List<Date> dateList = new ArrayList<>();
while (Stopper.isRunning()) {
startTime = cronExpression.getNextValidTimeAfter(startTime);
if (startTime == null || startTime.after(endTime) || startTime.equals(endTime)) {
break;
}
dateList.add(startTime);
}
return dateList; return getSelfFireDateList(zonedDateTimeStart, zonedDateTimeEnd, schedules).stream()
.map(zonedDateTime -> new Date(zonedDateTime.toInstant().toEpochMilli()))
.collect(Collectors.toList());
} }
/** /**
* gets all scheduled times for a period of time based on self dependency * gets all scheduled times for a period of time based on self dependency
* if schedulers is empty then default scheduler = 1 day * if schedulers is empty then default scheduler = 1 day
*/ */
public static List<Date> getSelfFireDateList(final Date startTime, final Date endTime, final List<Schedule> schedules) { public static List<ZonedDateTime> getSelfFireDateList(@NonNull final ZonedDateTime startTime,
List<Date> result = new ArrayList<>(); @NonNull final ZonedDateTime endTime,
@NonNull final List<Schedule> schedules)
throws CronParseException {
List<ZonedDateTime> result = new ArrayList<>();
if (startTime.equals(endTime)) { if (startTime.equals(endTime)) {
result.add(startTime); result.add(startTime);
return result; return result;
} }
// support left closed and right closed time interval (startDate <= N <= endDate) // support left closed and right closed time interval (startDate <= N <= endDate)
Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); ZonedDateTime from = startTime.minusSeconds(1L);
Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS); ZonedDateTime to = endTime.plusSeconds(1L);
List<Schedule> listSchedule = new ArrayList<>(); List<Schedule> listSchedule = new ArrayList<>();
listSchedule.addAll(schedules); listSchedule.addAll(schedules);
@ -209,30 +228,11 @@ public class CronUtils {
listSchedule.add(schedule); listSchedule.add(schedule);
} }
for (Schedule schedule : listSchedule) { for (Schedule schedule : listSchedule) {
result.addAll(CronUtils.getSelfFireDateList(from, to, schedule.getCrontab())); result.addAll(CronUtils.getFireDateList(from, to, schedule.getCrontab()));
} }
return result; return result;
} }
/**
* gets all scheduled times for a period of time based on self dependency
*
* @param startTime startTime
* @param endTime endTime
* @param cron cron
* @return date list
*/
public static List<Date> getSelfFireDateList(Date startTime, Date endTime, String cron) {
CronExpression cronExpression = null;
try {
cronExpression = parse2CronExpression(cron);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
return Collections.emptyList();
}
return getSelfFireDateList(startTime, endTime, cronExpression);
}
/** /**
* get expiration time * get expiration time
* *
@ -289,6 +289,7 @@ public class CronUtils {
/** /**
* get Schedule Date * get Schedule Date
*
* @param param * @param param
* @return date list * @return date list
*/ */

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleFactory.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleFactory.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.corn; package org.apache.dolphinscheduler.service.cron;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.cronutils.model.field.expression.Always; import com.cronutils.model.field.expression.Always;

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/corn/CycleLinks.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CycleLinks.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.corn; package org.apache.dolphinscheduler.service.cron;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;

29
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/CronParseException.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
public class CronParseException extends Exception {
public CronParseException(String message) {
super(message);
}
public CronParseException(String message, Throwable throwable) {
super(message, throwable);
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date; import java.util.Date;
@ -61,7 +62,7 @@ import org.springframework.transaction.annotation.Transactional;
public interface ProcessService { public interface ProcessService {
@Transactional @Transactional
ProcessInstance handleCommand(String host, Command command); ProcessInstance handleCommand(String host, Command command) throws CronParseException;
void moveToErrorCommand(Command command, String message); void moveToErrorCommand(Command command, String message);

41
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@ -33,6 +31,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
@ -130,12 +129,14 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@ -152,7 +153,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -166,6 +166,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
*/ */
@ -277,14 +279,13 @@ public class ProcessServiceImpl implements ProcessService {
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
* @param logger logger
* @param host host * @param host host
* @param command found command * @param command found command
* @return process instance * @return process instance
*/ */
@Override @Override
@Transactional @Transactional
public ProcessInstance handleCommand(String host, Command command) { public ProcessInstance handleCommand(String host, Command command) throws CronParseException {
ProcessInstance processInstance = constructProcessInstance(command, host); ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null // cannot construct process instance, return null
if (processInstance == null) { if (processInstance == null) {
@ -731,15 +732,14 @@ public class ProcessServiceImpl implements ProcessService {
* @param cmdParam cmdParam map * @param cmdParam cmdParam map
* @return date * @return date
*/ */
private Date getScheduleTime(Command command, Map<String, String> cmdParam) { private Date getScheduleTime(Command command, Map<String, String> cmdParam) throws CronParseException {
Date scheduleTime = command.getScheduleTime(); Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
&& cmdParam != null
&& cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules = queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); List<Schedule> schedules =
queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
List<Date> complementDateList = CronUtils.getSelfFireDateList(start, end, schedules); List<Date> complementDateList = CronUtils.getSelfFireDateList(start, end, schedules);
if (complementDateList.size() > 0) { if (complementDateList.size() > 0) {
@ -922,12 +922,13 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host * @param host host
* @return process instance * @return process instance
*/ */
protected ProcessInstance constructProcessInstance(Command command, String host) { protected ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException {
ProcessInstance processInstance; ProcessInstance processInstance;
ProcessDefinition processDefinition; ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType(); CommandType commandType = command.getCommandType();
processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinition == null) { if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null; return null;
@ -1122,7 +1123,7 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
private void initComplementDataParam(ProcessDefinition processDefinition, private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance, ProcessInstance processInstance,
Map<String, String> cmdParam) { Map<String, String> cmdParam) throws CronParseException {
if (!processInstance.isComplementData()) { if (!processInstance.isComplementData()) {
return; return;
} }
@ -1130,16 +1131,16 @@ public class ProcessServiceImpl implements ProcessService {
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Date> complementDate = Lists.newLinkedList(); List<Date> complementDate = Lists.newLinkedList();
if(start != null && end != null){ if (start != null && end != null) {
List<Schedule> listSchedules = queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); List<Schedule> listSchedules =
queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules); complementDate = CronUtils.getSelfFireDateList(start, end, listSchedules);
} }
if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementDate = CronUtils.getSelfScheduleDateList(cmdParam); complementDate = CronUtils.getSelfScheduleDateList(cmdParam);
} }
if (complementDate.size() > 0 if (complementDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
&& Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0)); processInstance.setScheduleTime(complementDate.get(0));
} }

99
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cron/CronUtilsTest.java

@ -24,9 +24,10 @@ import static com.cronutils.model.field.expression.FieldExpressionFactory.questi
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import java.text.ParseException; import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date; import java.util.Date;
import org.junit.Assert; import org.junit.Assert;
@ -59,15 +60,9 @@ public class CronUtilsTest {
*/ */
@Test @Test
public void testCronAsString() { public void testCronAsString() {
Cron cron = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) Cron cron = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always())
.withYear(always()) .withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5))
.withDoW(questionMark()) .withSecond(on(0)).instance();
.withMonth(always())
.withDoM(always())
.withHour(always())
.withMinute(every(5))
.withSecond(on(0))
.instance();
// Obtain the string expression // Obtain the string expression
String cronAsString = cron.asString(); String cronAsString = cron.asString();
@ -78,11 +73,9 @@ public class CronUtilsTest {
/** /**
* cron parse test * cron parse test
*
* @throws ParseException if error throws ParseException
*/ */
@Test @Test
public void testCronParse() throws ParseException { public void testCronParse() throws CronParseException {
String strCrontab = "0 1 2 3 * ? *"; String strCrontab = "0 1 2 3 * ? *";
Cron depCron = CronUtils.parse2Cron(strCrontab); Cron depCron = CronUtils.parse2Cron(strCrontab);
@ -96,11 +89,9 @@ public class CronUtilsTest {
/** /**
* schedule type test * schedule type test
*
* @throws ParseException if error throws ParseException
*/ */
@Test @Test
public void testScheduleType() throws ParseException { public void testScheduleType() throws CronParseException {
CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *")); CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *"));
Assert.assertEquals("MINUTE", cycleEnum.name()); Assert.assertEquals("MINUTE", cycleEnum.name());
@ -129,23 +120,15 @@ public class CronUtilsTest {
* test * test
*/ */
@Test @Test
public void test2() { public void test2() throws CronParseException {
Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) Cron cron1 = CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)).withYear(always())
.withYear(always()) .withDoW(questionMark()).withMonth(always()).withDoM(always()).withHour(always()).withMinute(every(5))
.withDoW(questionMark()) .withSecond(on(0)).instance();
.withMonth(always())
.withDoM(always())
.withHour(always())
.withMinute(every(5))
.withSecond(on(0))
.instance();
// minute cycle // minute cycle
String[] cronArayy = new String[] {"* * * * * ? *", "* 0 * * * ? *", String[] cronArayy =
"* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *", "0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"}; new String[] {"* * * * * ? *", "* 0 * * * ? *", "* 5 * * 3/5 ? *", "0 0 * * * ? *", "0 0 7 * 1 ? *",
"0 0 7 * 1/1 ? *", "0 0 7 * 1-2 ? *", "0 0 7 * 1,2 ? *"};
for (String minCrontab : cronArayy) { for (String minCrontab : cronArayy) {
if (!org.quartz.CronExpression.isValidExpression(minCrontab)) {
throw new RuntimeException(minCrontab + " verify failure, cron expression not valid");
}
Cron cron = CronUtils.parse2Cron(minCrontab); Cron cron = CronUtils.parse2Cron(minCrontab);
CronField minField = cron.retrieve(CronFieldName.MINUTE); CronField minField = cron.retrieve(CronFieldName.MINUTE);
logger.info("minField instanceof Between:" + (minField.getExpression() instanceof Between)); logger.info("minField instanceof Between:" + (minField.getExpression() instanceof Between));
@ -166,7 +149,8 @@ public class CronUtilsTest {
logger.info("dayOfMonthField instanceof Every:" + (dayOfMonthField.getExpression() instanceof Every)); logger.info("dayOfMonthField instanceof Every:" + (dayOfMonthField.getExpression() instanceof Every));
logger.info("dayOfMonthField instanceof On:" + (dayOfMonthField.getExpression() instanceof On)); logger.info("dayOfMonthField instanceof On:" + (dayOfMonthField.getExpression() instanceof On));
logger.info("dayOfMonthField instanceof And:" + (dayOfMonthField.getExpression() instanceof And)); logger.info("dayOfMonthField instanceof And:" + (dayOfMonthField.getExpression() instanceof And));
logger.info("dayOfMonthField instanceof QuestionMark:" + (dayOfMonthField.getExpression() instanceof QuestionMark)); logger.info(
"dayOfMonthField instanceof QuestionMark:" + (dayOfMonthField.getExpression() instanceof QuestionMark));
CronField monthField = cron.retrieve(CronFieldName.MONTH); CronField monthField = cron.retrieve(CronFieldName.MONTH);
logger.info("monthField instanceof Between:" + (monthField.getExpression() instanceof Between)); logger.info("monthField instanceof Between:" + (monthField.getExpression() instanceof Between));
@ -182,7 +166,8 @@ public class CronUtilsTest {
logger.info("dayOfWeekField instanceof Every:" + (dayOfWeekField.getExpression() instanceof Every)); logger.info("dayOfWeekField instanceof Every:" + (dayOfWeekField.getExpression() instanceof Every));
logger.info("dayOfWeekField instanceof On:" + (dayOfWeekField.getExpression() instanceof On)); logger.info("dayOfWeekField instanceof On:" + (dayOfWeekField.getExpression() instanceof On));
logger.info("dayOfWeekField instanceof And:" + (dayOfWeekField.getExpression() instanceof And)); logger.info("dayOfWeekField instanceof And:" + (dayOfWeekField.getExpression() instanceof And));
logger.info("dayOfWeekField instanceof QuestionMark:" + (dayOfWeekField.getExpression() instanceof QuestionMark)); logger.info(
"dayOfWeekField instanceof QuestionMark:" + (dayOfWeekField.getExpression() instanceof QuestionMark));
CronField yearField = cron.retrieve(CronFieldName.YEAR); CronField yearField = cron.retrieve(CronFieldName.YEAR);
logger.info("yearField instanceof Between:" + (yearField.getExpression() instanceof Between)); logger.info("yearField instanceof Between:" + (yearField.getExpression() instanceof Between));
@ -203,27 +188,39 @@ public class CronUtilsTest {
} }
@Test @Test
public void getSelfFireDateList() throws ParseException { public void getSelfFireDateList() throws CronParseException {
Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); ZonedDateTime from =
Date to = DateUtils.stringToDate("2020-01-31 00:00:00"); ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:00:00").toInstant(), ZoneId.systemDefault());
ZonedDateTime to =
ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-31 00:00:00").toInstant(), ZoneId.systemDefault());
// test date // test date
Assert.assertEquals(0, CronUtils.getSelfFireDateList(to, from, "0 0 0 * * ? ").size()); Assert.assertEquals(0, CronUtils.getFireDateList(to, from, "0 0 0 * * ? ").size());
try {
// test error cron // test error cron
Assert.assertEquals(0, CronUtils.getSelfFireDateList(from, to, "0 0 0 * *").size()); // should throw exception
CronUtils.getFireDateList(from, to, "0 0 0 * *").size();
Assert.assertTrue(false);
} catch (CronParseException cronParseException) {
Assert.assertTrue(true);
}
// test cron // test cron
Assert.assertEquals(29, CronUtils.getSelfFireDateList(from, to, "0 0 0 * * ? ").size()); Assert.assertEquals(30, CronUtils.getFireDateList(from, to, "0 0 0 * * ? ").size());
// test other // test other
Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size()); Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2Cron("0 0 0 * * ? ")).size());
Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size()); Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2Cron("0 0 0 * * ? "), 5).size());
from = DateUtils.stringToDate("2020-01-01 00:02:00"); from =
to = DateUtils.stringToDate("2020-01-01 00:02:00"); ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault());
Assert.assertEquals(1, CronUtils.getFireDateList(new Date(from.getTime() - 1000), to, CronUtils.parse2CronExpression("0 * * * * ? ")).size()); to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault());
Assert.assertEquals(1,
from = DateUtils.stringToDate("2020-01-01 00:02:00"); CronUtils.getFireDateList(from.minusSeconds(1L), to, CronUtils.parse2Cron("0 * * * * ? ")).size());
to = DateUtils.stringToDate("2020-01-01 00:04:00");
Assert.assertEquals(2, CronUtils.getFireDateList(new Date(from.getTime() - 1000), from =
new Date(to.getTime() - 1000), ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:02:00").toInstant(), ZoneId.systemDefault());
CronUtils.parse2CronExpression("0 * * * * ? ")).size()); to = ZonedDateTime.ofInstant(DateUtils.stringToDate("2020-01-01 00:04:00").toInstant(),
ZoneId.systemDefault());
Assert.assertEquals(2,
CronUtils.getFireDateList(from.minusSeconds(1L), to.minusSeconds(1L), CronUtils.parse2Cron("0 * * * * ? "))
.size());
} }
@Test @Test

5
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -284,7 +285,7 @@ public class ProcessServiceTest {
} }
@Test @Test
public void testHandleCommand() { public void testHandleCommand() throws CronParseException {
//cannot construct process instance, return null; //cannot construct process instance, return null;
String host = "127.0.0.1"; String host = "127.0.0.1";
@ -461,7 +462,7 @@ public class ProcessServiceTest {
} }
@Test(expected = ServiceException.class) @Test(expected = ServiceException.class)
public void testDeleteNotExistCommand() { public void testDeleteNotExistCommand() throws CronParseException {
String host = "127.0.0.1"; String host = "127.0.0.1";
int definitionVersion = 1; int definitionVersion = 1;
long definitionCode = 123; long definitionCode = 123;

6
dolphinscheduler-worker/pom.xml

@ -34,12 +34,6 @@
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId> <artifactId>dolphinscheduler-service</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>

Loading…
Cancel
Save