diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d6d571f145..eb71239f48 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -469,6 +469,8 @@ public final class Constants { public static final String CMD_PARAM_START_PARAMS = "StartParams"; + public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams"; + /** * complement data start date */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 80bd3f7a9d..b7a4d00380 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -557,7 +557,7 @@ public class MasterExecThread implements Runnable { private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) { String globalParams = this.processInstance.getGlobalParams(); if (StringUtils.isNotEmpty(globalParams)) { - Map globalMap = getGlobalParamMap(globalParams); + Map globalMap = processService.getGlobalParamMap(globalParams); if (globalMap != null && globalMap.size() != 0) { setGlobalMapToTask(taskNode, taskInstance, globalMap); } @@ -586,17 +586,6 @@ public class MasterExecThread implements Runnable { } } - public Map getGlobalParamMap(String globalParams) { - List propList; - Map globalParamMap = new HashMap<>(); - if (StringUtils.isNotEmpty(globalParams)) { - propList = JSONUtils.toList(globalParams, Property.class); - globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - } - - return globalParamMap; - } - private void submitPostNode(String parentNodeName) { Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); List taskInstances = new ArrayList<>(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a759c24e50..fe73eaedfb 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; @@ -586,14 +587,19 @@ public class ProcessService { private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { // get start params from command param - Map startParamMap = null; + Map startParamMap = new HashMap<>(); if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); startParamMap = JSONUtils.toMap(startParamJson); } - + Map fatherParamMap = new HashMap<>(); + if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) { + String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS); + fatherParamMap = JSONUtils.toMap(fatherParamJson); + } + startParamMap.putAll(fatherParamMap); // set start param into global params - if (startParamMap != null && startParamMap.size() > 0 + if (startParamMap.size() > 0 && processDefinition.getGlobalParamMap() != null) { for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { String val = startParamMap.get(param.getKey()); @@ -1065,7 +1071,7 @@ public class ProcessService { /** * complement data needs transform parent parameter to child. */ - private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,Map fatherParams) { // set sub work process command String processMapStr = JSONUtils.toJsonString(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); @@ -1077,9 +1083,24 @@ public class ProcessService { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); processMapStr = JSONUtils.toJsonString(cmdParam); } + if (fatherParams.size() != 0) { + cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams)); + processMapStr = JSONUtils.toJsonString(cmdParam); + } return processMapStr; } + public Map getGlobalParamMap(String globalParams) { + List propList; + Map globalParamMap = new HashMap<>(); + if (StringUtils.isNotEmpty(globalParams)) { + propList = JSONUtils.toList(globalParams, Property.class); + globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + } + + return globalParamMap; + } + /** * create sub work process command */ @@ -1089,9 +1110,18 @@ public class ProcessService { TaskInstance task) { CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)); - String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Integer childDefineId = Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID))); + Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + Map globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams()); + Map fatherParams = new HashMap<>(); + if (CollectionUtils.isNotEmpty(allParam)) { + for (Property info : allParam) { + fatherParams.put(info.getProp(), globalMap.get(info.getProp())); + } + } + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance,fatherParams); return new Command( commandType, @@ -1601,7 +1631,7 @@ public class ProcessService { if (property == null) { continue; } - String value = row.get(paramName); + String value = String.valueOf(row.get(paramName)); if (StringUtils.isNotEmpty(value)) { property.setValue(value); info.setValue(value);