From e091801e05c8e9243ec96f76a2e7cc1f1f5cecab Mon Sep 17 00:00:00 2001 From: wind Date: Mon, 11 Oct 2021 15:13:38 +0800 Subject: [PATCH] [Improvement-6474] [MasterServer] schedule time for process instance optimization (#6477) * [DS-6474][MasterServer] change to handle schedule time for process instance in WorkflowExecuteThread * delete all the valid tasks when complement data if id is not null * checkstyle Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/WorkflowExecuteThread.java | 9 +++++++++ .../service/process/ProcessService.java | 20 +++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index ad444070be..3525ae4d77 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -575,6 +575,15 @@ public class WorkflowExecuteThread implements Runnable { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate.toString()); + + if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { + processInstance.setScheduleTime(complementListDate.get(0)); + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processService.updateProcessInstance(processInstance); + } } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index e78112a33b..4ca47c8fca 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -125,7 +125,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.facebook.presto.jdbc.internal.guava.collect.Lists; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -630,10 +629,8 @@ public class ProcessService { processInstance.setWarningGroupId(warningGroupId); processInstance.setDryRun(command.getDryRun()); - // schedule time - Date scheduleTime = getScheduleTime(command, cmdParam); - if (scheduleTime != null) { - processInstance.setScheduleTime(scheduleTime); + if (command.getScheduleTime() != null) { + processInstance.setScheduleTime(command.getScheduleTime()); } processInstance.setCommandStartTime(command.getStartTime()); processInstance.setLocations(processDefinition.getLocations()); @@ -878,13 +875,14 @@ public class ProcessService { runStatus = processInstance.getState(); break; case COMPLEMENT_DATA: - // delete all the valid tasks when complement data - List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance taskInstance : taskInstanceList) { - taskInstance.setFlag(Flag.NO); - this.updateTaskInstance(taskInstance); + // delete all the valid tasks when complement data if id is not null + if (processInstance.getId() != 0) { + List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance taskInstance : taskInstanceList) { + taskInstance.setFlag(Flag.NO); + this.updateTaskInstance(taskInstance); + } } - initComplementDataParam(processDefinition, processInstance, cmdParam); break; case REPEAT_RUNNING: // delete the recover task names from command parameter