From 4515d3191fbb018bb215f6eca3c3a7d48f9f1d37 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Sun, 25 Dec 2022 13:35:33 +0800 Subject: [PATCH] [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005) * fix start params override global params bug * set startup parameters the highest priority * add global parameter prefix * fix import --- .../dolphinscheduler/common/Constants.java | 6 +++ .../master/runner/WorkflowExecuteThread.java | 43 ++++++++++++++++++- .../service/process/ProcessService.java | 28 ++++++++---- .../spi/task/paramparser/ParamUtils.java | 31 ++++++++++--- .../task/paramparser/PlaceholderUtils.java | 7 ++- .../dolphinscheduler/spi/utils/Constants.java | 5 +++ .../plugin/task/shell/ShellTask.java | 2 +- 7 files changed, 105 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 1f6558a0db..70fab13f76 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -935,6 +935,12 @@ public final class Constants { */ public static final String LOCALE_LANGUAGE = "language"; + /** + * temporary parameter prefix + */ + public static final String START_UP_PARAMS_PREFIX = "startup-"; + public static final String GLOBAL_PARAMS_PREFIX = "global-"; + /** * driver */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 8a0f5f2986..8f96c20c7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -23,11 +23,15 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_ST import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; +import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX; +import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.common.enums.Direct.IN; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DependResult; -import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -688,6 +692,9 @@ public class WorkflowExecuteThread implements Runnable { if (processInstance.isComplementData() && complementListDate.size() == 0) { Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + // reset global params while there are start parameters + setGlobalParamIfCommanded(processDefinition, cmdParam); + Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); @@ -944,7 +951,7 @@ public class WorkflowExecuteThread implements Runnable { private void setVarPoolValue(Map allProperty, Map allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { //for this taskInstance all the param in this part is IN. - thisProperty.setDirect(Direct.IN); + thisProperty.setDirect(IN); //get the pre taskInstance Property's name String proName = thisProperty.getProp(); //if the Previous nodes have the Property of same name @@ -1629,4 +1636,36 @@ public class WorkflowExecuteThread implements Runnable { public Map getActiveTaskProcessorMaps() { return activeTaskProcessorMaps; } + + private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { + // get start params from command param + Map startParamMap = new HashMap<>(); + if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { + String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); + startParamMap = JSONUtils.toMap(startParamJson); + } + Map fatherParamMap = new HashMap<>(); + if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) { + String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS); + fatherParamMap = JSONUtils.toMap(fatherParamJson); + } + startParamMap.putAll(fatherParamMap); + Map globalMap = processDefinition.getGlobalParamMap(); + List globalParamList = processDefinition.getGlobalParamList(); + if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) { + Map tempGlobalMap = new HashMap<>(); + // add prefix for global params + for (Map.Entry param : globalMap.entrySet()) { + tempGlobalMap.put(GLOBAL_PARAMS_PREFIX+ param.getKey(), param.getValue()); + } + globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp())); + // set start param into global params, add prefix for startup params + for (Entry startParam : startParamMap.entrySet()) { + String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey(); + tempGlobalMap.put(tmpStartParamKey, startParam.getValue()); + globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue())); + } + processDefinition.setGlobalParamMap(tempGlobalMap); + } + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b79e31bee1..1d017eb4db 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -26,9 +26,14 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX; import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.common.enums.Direct.IN; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -653,15 +658,22 @@ public class ProcessService { fatherParamMap = JSONUtils.toMap(fatherParamJson); } startParamMap.putAll(fatherParamMap); - // set start param into global params - if (startParamMap.size() > 0 - && processDefinition.getGlobalParamMap() != null) { - for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { - String val = startParamMap.get(param.getKey()); - if (val != null) { - param.setValue(val); - } + Map globalMap = processDefinition.getGlobalParamMap(); + List globalParamList = processDefinition.getGlobalParamList(); + if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) { + Map tempGlobalMap = new HashMap<>(); + // add prefix for global params + for (Map.Entry param : globalMap.entrySet()) { + tempGlobalMap.put(GLOBAL_PARAMS_PREFIX + param.getKey(), param.getValue()); + } + globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp())); + // set start param into global params, add prefix for startup params + for (Entry startParam : startParamMap.entrySet()) { + String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey(); + tempGlobalMap.put(tmpStartParamKey, startParam.getValue()); + globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue())); } + processDefinition.setGlobalParamMap(tempGlobalMap); } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java index 069f941ff8..a96fd29883 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.spi.task.paramparser; import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_EXECUTE_PATH; import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_INSTANCE_ID; - +import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX; +import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.spi.enums.CommandType; import org.apache.dolphinscheduler.spi.enums.DataType; import org.apache.dolphinscheduler.spi.task.AbstractParameters; @@ -60,12 +62,14 @@ public class ParamUtils { CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); Date scheduleTime = taskExecutionContext.getScheduleTime(); + Map convertedParams = new HashMap<>(); + // combining local and global parameters Map localParams = parameters.getLocalParametersMap(); Map varParams = parameters.getVarPoolMap(); - if (globalParams == null && localParams == null) { + if (MapUtils.isEmpty(globalParams) && MapUtils.isEmpty(localParams)) { return null; } // if it is a complement, @@ -75,8 +79,7 @@ public class ParamUtils { .getBusinessTime(commandType, scheduleTime); - if (globalParamsMap != null) { - + if (MapUtils.isNotEmpty(globalParamsMap)) { params.putAll(globalParamsMap); } @@ -87,12 +90,19 @@ public class ParamUtils { if (globalParams != null && localParams != null) { globalParams.putAll(localParams); + for (Map.Entry entry : localParams.entrySet()) { + convertedParams.put(entry.getKey(), entry.getValue()); + } } else if (globalParams == null && localParams != null) { globalParams = localParams; + convertedParams = localParams; } if (varParams != null) { varParams.putAll(globalParams); globalParams = varParams; + for (Map.Entry entry : varParams.entrySet()) { + convertedParams.put(entry.getKey(), entry.getValue()); + } } Iterator> iter = globalParams.entrySet().iterator(); while (iter.hasNext()) { @@ -111,9 +121,20 @@ public class ParamUtils { val = ParameterUtils.convertParameterPlaceholders(val, params); property.setValue(val); } + + if (property.getProp().startsWith(START_UP_PARAMS_PREFIX)) { + property.setProp(property.getProp().replaceFirst(START_UP_PARAMS_PREFIX, "")); + convertedParams.put(property.getProp(), property); + } else if (property.getProp().startsWith(GLOBAL_PARAMS_PREFIX)) { + String prop = property.getProp().replaceFirst(GLOBAL_PARAMS_PREFIX, ""); + if (!convertedParams.containsKey(prop)) { + property.setProp(prop); + convertedParams.put(prop, property); + } + } } - return globalParams; + return convertedParams; } /** diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java index 90ee18311a..89ca56faf9 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.spi.task.paramparser; +import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX; +import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX; + import java.util.Map; import org.slf4j.Logger; @@ -92,7 +95,9 @@ public class PlaceholderUtils { @Override public String resolvePlaceholder(String placeholderName) { try { - return paramsMap.get(placeholderName); + String startUpPlaceholderName = START_UP_PARAMS_PREFIX + placeholderName; + String globalPlaceholderName = GLOBAL_PARAMS_PREFIX + placeholderName; + return paramsMap.getOrDefault(startUpPlaceholderName, paramsMap.getOrDefault(placeholderName, paramsMap.getOrDefault(globalPlaceholderName, null))); } catch (Exception ex) { logger.error("resolve placeholder '{}' in [ {} ]", placeholderName, value, ex); return null; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java index abe0672241..33e34b37f8 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java @@ -136,6 +136,11 @@ public class Constants { public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver"; public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"; + /** + * temporary parameter prefix + */ + public static final String START_UP_PARAMS_PREFIX = "startup-"; + public static final String GLOBAL_PARAMS_PREFIX = "global-"; /** * validation Query diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 5d9fb80ce1..43859e8da1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -158,7 +158,7 @@ public class ShellTask extends AbstractTaskExecutor { private String parseScript(String script) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); if (MapUtils.isEmpty(paramsMap)) { paramsMap = new HashMap<>(); }