Browse Source

[Improvement-6474] [MasterServer] schedule time for process instance optimization (#6477)

* [DS-6474][MasterServer] change to handle schedule time for process instance in WorkflowExecuteThread

* delete all the valid tasks when complement data if id is not null

* checkstyle

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
e091801e05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -575,6 +575,15 @@ public class WorkflowExecuteThread implements Runnable {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
logger.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), complementListDate.toString());
if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processService.updateProcessInstance(processInstance);
}
}
}
}

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -125,7 +125,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -630,10 +629,8 @@ public class ProcessService {
processInstance.setWarningGroupId(warningGroupId);
processInstance.setDryRun(command.getDryRun());
// schedule time
Date scheduleTime = getScheduleTime(command, cmdParam);
if (scheduleTime != null) {
processInstance.setScheduleTime(scheduleTime);
if (command.getScheduleTime() != null) {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setCommandStartTime(command.getStartTime());
processInstance.setLocations(processDefinition.getLocations());
@ -878,13 +875,14 @@ public class ProcessService {
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
// delete all the valid tasks when complement data
List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
this.updateTaskInstance(taskInstance);
// delete all the valid tasks when complement data if id is not null
if (processInstance.getId() != 0) {
List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
this.updateTaskInstance(taskInstance);
}
}
initComplementDataParam(processDefinition, processInstance, cmdParam);
break;
case REPEAT_RUNNING:
// delete the recover task names from command parameter

Loading…
Cancel
Save