diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 2859731143..70156c76df 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -96,6 +96,7 @@ import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -784,6 +785,16 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return; } Map cmdParam = new HashMap<>(); + // write the parameters of the nextProcessInstance to command + if (StringUtils.isNotEmpty(nextProcessInstance.getCommandParam())) { + Map commandStartParamsMap = JSONUtils.toMap(nextProcessInstance.getCommandParam()); + if (MapUtils.isNotEmpty(commandStartParamsMap)) { + Map paramsMap = JSONUtils.toMap(commandStartParamsMap.get(CMD_PARAM_START_PARAMS)); + if (MapUtils.isNotEmpty(paramsMap)) { + cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(paramsMap)); + } + } + } cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId); Command command = new Command(); command.setCommandType(CommandType.RECOVER_SERIAL_WAIT); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 1d5e1f960b..f6db4de3cc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -744,6 +744,7 @@ public class ProcessServiceImpl implements ProcessService { * @param host host * @return process instance */ + @Override public @Nullable ProcessInstance constructProcessInstance(Command command, String host) throws CronParseException, CodeGenerateException { ProcessInstance processInstance; @@ -774,7 +775,8 @@ public class ProcessServiceImpl implements ProcessService { CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command); // reset global params while repeat running and recover tolerance fault process is needed by cmdParam if (commandTypeIfComplement == CommandType.REPEAT_RUNNING || - commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) { + commandTypeIfComplement == CommandType.RECOVER_TOLERANCE_FAULT_PROCESS || + commandTypeIfComplement == CommandType.RECOVER_SERIAL_WAIT) { setGlobalParamIfCommanded(processDefinition, cmdParam); }