Browse Source

[Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)

* fix start params override global params bug

* set startup parameters the highest priority

* add global parameter prefix

* fix import
2.0.8-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
4515d3191f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 43
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  3. 30
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  4. 31
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
  5. 7
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
  6. 5
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -935,6 +935,12 @@ public final class Constants {
*/
public static final String LOCALE_LANGUAGE = "language";
/**
* temporary parameter prefix
*/
public static final String START_UP_PARAMS_PREFIX = "startup-";
public static final String GLOBAL_PARAMS_PREFIX = "global-";
/**
* driver
*/

43
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -23,11 +23,15 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_ST
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.common.enums.Direct.IN;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -688,6 +692,9 @@ public class WorkflowExecuteThread implements Runnable {
if (processInstance.isComplementData() && complementListDate.size() == 0) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition, cmdParam);
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@ -944,7 +951,7 @@ public class WorkflowExecuteThread implements Runnable {
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN);
thisProperty.setDirect(IN);
//get the pre taskInstance Property's name
String proName = thisProperty.getProp();
//if the Previous nodes have the Property of same name
@ -1629,4 +1636,36 @@ public class WorkflowExecuteThread implements Runnable {
public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
return activeTaskProcessorMaps;
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();
if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
Map<String, String> fatherParamMap = new HashMap<>();
if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
Map<String, String> tempGlobalMap = new HashMap<>();
// add prefix for global params
for (Map.Entry<String, String> param : globalMap.entrySet()) {
tempGlobalMap.put(GLOBAL_PARAMS_PREFIX+ param.getKey(), param.getValue());
}
globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
// set start param into global params, add prefix for startup params
for (Entry<String, String> startParam : startParamMap.entrySet()) {
String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
}
processDefinition.setGlobalParamMap(tempGlobalMap);
}
}
}

30
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -26,9 +26,14 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.common.enums.Direct.IN;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -653,15 +658,22 @@ public class ProcessService {
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
// set start param into global params
if (startParamMap.size() > 0
&& processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
Map<String, String> globalMap = processDefinition.getGlobalParamMap();
List<Property> globalParamList = processDefinition.getGlobalParamList();
if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
Map<String, String> tempGlobalMap = new HashMap<>();
// add prefix for global params
for (Map.Entry<String, String> param : globalMap.entrySet()) {
tempGlobalMap.put(GLOBAL_PARAMS_PREFIX + param.getKey(), param.getValue());
}
globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
// set start param into global params, add prefix for startup params
for (Entry<String, String> startParam : startParamMap.entrySet()) {
String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
}
processDefinition.setGlobalParamMap(tempGlobalMap);
}
}

31
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java

@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.spi.task.paramparser;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.spi.enums.CommandType;
import org.apache.dolphinscheduler.spi.enums.DataType;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
@ -60,12 +62,14 @@ public class ParamUtils {
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
Map<String, Property> convertedParams = new HashMap<>();
// combining local and global parameters
Map<String, Property> localParams = parameters.getLocalParametersMap();
Map<String, Property> varParams = parameters.getVarPoolMap();
if (globalParams == null && localParams == null) {
if (MapUtils.isEmpty(globalParams) && MapUtils.isEmpty(localParams)) {
return null;
}
// if it is a complement,
@ -75,8 +79,7 @@ public class ParamUtils {
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
if (MapUtils.isNotEmpty(globalParamsMap)) {
params.putAll(globalParamsMap);
}
@ -87,12 +90,19 @@ public class ParamUtils {
if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
for (Map.Entry<String, Property> entry : localParams.entrySet()) {
convertedParams.put(entry.getKey(), entry.getValue());
}
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
convertedParams = localParams;
}
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
for (Map.Entry<String, Property> entry : varParams.entrySet()) {
convertedParams.put(entry.getKey(), entry.getValue());
}
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
@ -111,9 +121,20 @@ public class ParamUtils {
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
if (property.getProp().startsWith(START_UP_PARAMS_PREFIX)) {
property.setProp(property.getProp().replaceFirst(START_UP_PARAMS_PREFIX, ""));
convertedParams.put(property.getProp(), property);
} else if (property.getProp().startsWith(GLOBAL_PARAMS_PREFIX)) {
String prop = property.getProp().replaceFirst(GLOBAL_PARAMS_PREFIX, "");
if (!convertedParams.containsKey(prop)) {
property.setProp(prop);
convertedParams.put(prop, property);
}
}
}
return globalParams;
return convertedParams;
}
/**

7
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.spi.task.paramparser;
import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
import java.util.Map;
import org.slf4j.Logger;
@ -92,7 +95,9 @@ public class PlaceholderUtils {
@Override
public String resolvePlaceholder(String placeholderName) {
try {
return paramsMap.get(placeholderName);
String startUpPlaceholderName = START_UP_PARAMS_PREFIX + placeholderName;
String globalPlaceholderName = GLOBAL_PARAMS_PREFIX + placeholderName;
return paramsMap.getOrDefault(startUpPlaceholderName, paramsMap.getOrDefault(placeholderName, paramsMap.getOrDefault(globalPlaceholderName, null)));
} catch (Exception ex) {
logger.error("resolve placeholder '{}' in [ {} ]", placeholderName, value, ex);
return null;

5
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java

@ -136,6 +136,11 @@ public class Constants {
public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver";
public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver";
/**
* temporary parameter prefix
*/
public static final String START_UP_PARAMS_PREFIX = "startup-";
public static final String GLOBAL_PARAMS_PREFIX = "global-";
/**
* validation Query

Loading…
Cancel
Save