|
|
@ -96,6 +96,7 @@ import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; |
|
|
|
import org.apache.dolphinscheduler.service.utils.DagHelper; |
|
|
|
import org.apache.dolphinscheduler.service.utils.DagHelper; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections4.CollectionUtils; |
|
|
|
import org.apache.commons.collections4.CollectionUtils; |
|
|
|
|
|
|
|
import org.apache.commons.collections4.MapUtils; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.commons.lang3.tuple.Pair; |
|
|
|
import org.apache.commons.lang3.tuple.Pair; |
|
|
|
|
|
|
|
|
|
|
@ -784,6 +785,16 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
Map<String, Object> cmdParam = new HashMap<>(); |
|
|
|
Map<String, Object> cmdParam = new HashMap<>(); |
|
|
|
|
|
|
|
// write the parameters of the nextProcessInstance to command
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(nextProcessInstance.getCommandParam())) { |
|
|
|
|
|
|
|
Map<String, String> commandStartParamsMap = JSONUtils.toMap(nextProcessInstance.getCommandParam()); |
|
|
|
|
|
|
|
if (MapUtils.isNotEmpty(commandStartParamsMap)) { |
|
|
|
|
|
|
|
Map<String, String> paramsMap = JSONUtils.toMap(commandStartParamsMap.get(CMD_PARAM_START_PARAMS)); |
|
|
|
|
|
|
|
if (MapUtils.isNotEmpty(paramsMap)) { |
|
|
|
|
|
|
|
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(paramsMap)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId); |
|
|
|
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId); |
|
|
|
Command command = new Command(); |
|
|
|
Command command = new Command(); |
|
|
|
command.setCommandType(CommandType.RECOVER_SERIAL_WAIT); |
|
|
|
command.setCommandType(CommandType.RECOVER_SERIAL_WAIT); |
|
|
|