From 12d7e6261e12a8fa99d61a0f7475d60ecac5b33a Mon Sep 17 00:00:00 2001 From: Assert <42203474+shangeyao@users.noreply.github.com> Date: Thu, 13 Jan 2022 10:42:28 +0800 Subject: [PATCH] [Improvement][MasterServer] process complement data optimization #7925 (#7976) * [Improvement][MasterServer] process complement data optimization #7925 * [Improvement][MasterServer] process complement data optimization #7925 Co-authored-by: shangeyao --- .../master/runner/WorkflowExecuteThread.java | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index d8d8720b7f..911439dcff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -609,28 +609,40 @@ public class WorkflowExecuteThread { processInstance.getScheduleTime(), complementListDate.toString()); scheduleDate = complementListDate.get(index + 1); - //the next process complement - processInstance.setId(0); } - processInstance.setScheduleTime(scheduleDate); + //the next process complement + int create = this.createComplementDataCommand(scheduleDate); + if (create > 0) { + logger.info("create complement data command successfully."); + } + return true; + } + + private int createComplementDataCommand(Date scheduleDate) { + Command command = new Command(); + command.setScheduleTime(scheduleDate); + command.setCommandType(CommandType.COMPLEMENT_DATA); + command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); - processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); - } - - processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); - processInstance.setStartTime(new Date()); - processInstance.setRestartTime(processInstance.getStartTime()); - processInstance.setEndTime(null); - processService.saveProcessInstance(processInstance); - this.taskInstanceMap.clear(); - startProcess(); - return true; + } + cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss")); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + command.setTaskDependType(processInstance.getTaskDependType()); + command.setFailureStrategy(processInstance.getFailureStrategy()); + command.setWarningType(processInstance.getWarningType()); + command.setWarningGroupId(processInstance.getWarningGroupId()); + command.setStartTime(new Date()); + command.setExecutorId(processInstance.getExecutorId()); + command.setUpdateTime(new Date()); + command.setProcessInstancePriority(processInstance.getProcessInstancePriority()); + command.setWorkerGroup(processInstance.getWorkerGroup()); + command.setEnvironmentCode(processInstance.getEnvironmentCode()); + command.setDryRun(processInstance.getDryRun()); + command.setProcessInstanceId(0); + command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); + return processService.createCommand(command); } private boolean needComplementProcess() {