Browse Source

fix bug #6347 the first schedule_time is error in complement data (#6351)

2.0.7-release
OS 3 years ago committed by GitHub
parent
commit
20a3741b24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  2. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  3. 43
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -187,7 +187,7 @@ public class MasterSchedulerService extends Thread {
if (processInstance.getTimeout() > 0) {
this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
}
logger.info("command {} process {} start...",
logger.info("handle command end, command {} process {} start...",
command.getId(), processInstance.getId());
masterExecService.execute(workflowExecuteThread);
}

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -438,9 +438,14 @@ public class WorkflowExecuteThread implements Runnable {
endProcess();
int index = complementListDate.indexOf(scheduleDate);
if (index >= complementListDate.size() - 1 || !processInstance.getState().typeIsSuccess()) {
logger.info("process complement end. process id:{}", processInstance.getId());
// complement data ends || no success
return false;
}
logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
processInstance.getId(),
processInstance.getScheduleTime(),
complementListDate.toString());
scheduleDate = complementListDate.get(index + 1);
//the next process complement
processInstance.setId(0);
@ -555,16 +560,9 @@ public class WorkflowExecuteThread implements Runnable {
}
if (complementListDate.size() == 0 && needComplementProcess()) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
if (startDate.after(endDate)) {
Date tmp = startDate;
startDate = endDate;
endDate = tmp;
}
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
complementListDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules));
complementListDate = processService.getComplementDateList(
JSONUtils.toMap(processInstance.getCommandParam()),
processInstance.getProcessDefinitionCode());
logger.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), complementListDate.toString());
}

43
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -104,6 +104,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.lang.StringUtils;
@ -583,8 +584,16 @@ public class ProcessService {
*/
private Date getScheduleTime(Command command, Map<String, String> cmdParam) {
Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
if (scheduleTime == null
&& cmdParam != null
&& cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
List<Date> complementDateList = getComplementDateList(cmdParam, command.getProcessDefinitionCode());
if (complementDateList.size() > 0) {
scheduleTime = complementDateList.get(0);
} else {
logger.error("set scheduler time error: complement date list is empty, command: {}",
command.toString());
}
}
return scheduleTime;
}
@ -963,10 +972,11 @@ public class ProcessService {
return;
}
Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
YYYY_MM_DD_HH_MM_SS);
if (Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(startComplementTime);
List<Date> complementDate = getComplementDateList(cmdParam, processInstance.getProcessDefinitionCode());
if (complementDate.size() > 0
&& Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0));
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
@ -975,6 +985,27 @@ public class ProcessService {
}
/**
* return complement date list
*
* @param cmdParam
* @param processDefinitionCode
* @return
*/
public List<Date> getComplementDateList(Map<String, String> cmdParam, Long processDefinitionCode) {
List<Date> result = new ArrayList<>();
Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
if (startDate.after(endDate)) {
Date tmp = startDate;
startDate = endDate;
endDate = tmp;
}
List<Schedule> schedules = queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
result.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules));
return result;
}
/**
* set sub work process parameters.
* handle sub work process instance, update relation table and command parameters

Loading…
Cancel
Save