Browse Source

[Bug] [MASTER-9811]fix cmd param to overwrite global param when executing complement (#9952)

* fix cmd param to overwrite global param when executing complement

* fix cmd param to overwrite global param when executing complement
3.1.0-release
Tq 2 years ago committed by GitHub
parent
commit
d4aeee16e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

38
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -24,6 +24,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -942,6 +944,9 @@ public class WorkflowExecuteThread {
if (processInstance.isComplementData() && complementListDate.size() == 0) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition, cmdParam);
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@ -1960,4 +1965,37 @@ public class WorkflowExecuteThread {
}
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
Map<String, String> fatherParamMap = new HashMap<>();
if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (startParamMap.size() > 0 && globalMap != null) {
//start param to overwrite global param
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
//start param to create new global param if global not exist
for (Map.Entry<String, String> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());
globalParamList.add(new Property(startParam.getKey(), IN, VARCHAR, startParam.getValue()));
}
}
}
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -818,14 +818,15 @@ public class ProcessServiceImpl implements ProcessService {
// set start param into global params
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (startParamMap.size() > 0
&& globalMap != null) {
if (startParamMap.size() > 0 && globalMap != null) {
//start param to overwrite global param
for (Map.Entry<String, String> param : globalMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
//start param to create new global param if global not exist
for (Entry<String, String> startParam : startParamMap.entrySet()) {
if (!globalMap.containsKey(startParam.getKey())) {
globalMap.put(startParam.getKey(), startParam.getValue());

Loading…
Cancel
Save