Browse Source

[Improvement-5852][server] Support two parameters related to task for the rest of type of tasks. (#5867)

* provide two system parameters to support the rest of type of tasks

* provide two system parameters to support the rest of type of tasks

* improve test conversion
2.0.7-release
Hua Jiang 3 years ago committed by GitHub
parent
commit
4a68bfbe1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 73
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  3. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  4. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  5. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  6. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
  7. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  8. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  9. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  10. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  11. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
  12. 49
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
  13. 63
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java

73
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java

@ -43,69 +43,15 @@ public class ParamUtils {
/** /**
* parameter conversion * parameter conversion
* @param globalParams global params * Warning:
* @param globalParamsMap global params map * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
* @param localParams local params * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current
* @param commandType command type * situation is wrong. So I cannot modify the original logic.
* @param scheduleTime schedule time *
* @return global params
*/
public static Map<String,Property> convert(Map<String,Property> globalParams,
Map<String,String> globalParamsMap,
Map<String,Property> localParams,
Map<String,Property> varParams,
CommandType commandType,
Date scheduleTime) {
if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String,String> timeParams = BusinessTimeUtils
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
timeParams.putAll(globalParamsMap);
}
if (globalParams != null && localParams != null) {
localParams.putAll(globalParams);
globalParams = localParams;
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, timeParams);
property.setValue(val);
}
}
return globalParams;
}
/**
* parameter conversion
* @param taskExecutionContext the context of this task instance * @param taskExecutionContext the context of this task instance
* @param parameters the parameters * @param parameters the parameters
* @return global params * @return global params
*
*/ */
public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext); Preconditions.checkNotNull(taskExecutionContext);
@ -115,8 +61,11 @@ public class ParamUtils {
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime(); Date scheduleTime = taskExecutionContext.getScheduleTime();
// combining local and global parameters
Map<String,Property> localParams = parameters.getLocalParametersMap(); Map<String,Property> localParams = parameters.getLocalParametersMap();
Map<String,Property> varParams = parameters.getVarPoolMap();
if (globalParams == null && localParams == null) { if (globalParams == null && localParams == null) {
return null; return null;
} }
@ -141,6 +90,10 @@ public class ParamUtils {
} else if (globalParams == null && localParams != null) { } else if (globalParams == null && localParams != null) {
globalParams = localParams; globalParams = localParams;
} }
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator(); Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next(); Map.Entry<String, Property> en = iter.next();

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.datax;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
@ -154,13 +153,8 @@ public class DataxTask extends AbstractTask {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
// combining local and global parameters // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
dataXParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// run datax procesDataSourceService.s // run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap); String jsonFilePath = buildDataxJsonFile(paramsMap);

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.flink; package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -80,13 +79,8 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs(); String args = flinkParameters.getMainArgs();
// replace placeholder // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
flinkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("param Map : {}", paramsMap); logger.info("param Map : {}", paramsMap);
if (paramsMap != null) { if (paramsMap != null) {

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.http; package org.apache.dolphinscheduler.server.worker.task.http;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.enums.HttpParametersType; import org.apache.dolphinscheduler.common.enums.HttpParametersType;
import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.HttpProperty;
@ -137,13 +136,9 @@ public class HttpTask extends AbstractTask {
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder(); RequestBuilder builder = createRequestBuilder();
// replace placeholder // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
httpParameters.getLocalParametersMap(),
httpParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
List<HttpProperty> httpPropertyList = new ArrayList<>(); List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) { if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
for (HttpProperty httpProperty : httpParameters.getHttpParams()) { for (HttpProperty httpProperty : httpParameters.getHttpParams()) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.mr; package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
@ -84,13 +83,8 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue()); mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName(); setMainJarName();
// replace placeholder // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(),
mapreduceParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null) { if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java

@ -30,7 +30,6 @@ import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.ConnectionParam; import org.apache.dolphinscheduler.common.datasource.ConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
@ -119,12 +118,7 @@ public class ProcedureTask extends AbstractTask {
connection = DatasourceUtil.getConnection(dbType, connectionParam); connection = DatasourceUtil.getConnection(dbType, connectionParam);
// combining local and global parameters // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
procedureParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// call method // call method
stmt = connection.prepareCall(procedureParameters.getMethod()); stmt = connection.prepareCall(procedureParameters.getMethod());

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task.python;
package org.apache.dolphinscheduler.server.worker.task.python;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.slf4j.Logger;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
/** /**
* python task * python task
*/ */
@ -115,13 +116,8 @@ public class PythonTask extends AbstractTask {
private String buildCommand() throws Exception { private String buildCommand() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
pythonParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
try { try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -21,7 +21,6 @@ import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -42,10 +41,8 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -166,7 +163,7 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) { private String parseScript(String script) {
// combining local and global parameters // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters); Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) { if (taskExecutionContext.getScheduleTime() != null) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.spark; package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
@ -109,13 +108,8 @@ public class SparkTask extends AbstractYarnTask {
// other parameters // other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
// replace placeholder // replace placeholder, and combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
sparkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
String command = null; String command = null;

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@ -166,14 +165,8 @@ public class SqlTask extends AbstractTask {
Map<Integer, Property> sqlParamsMap = new HashMap<>(); Map<Integer, Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder(); StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
sqlParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// spell SQL according to the final user-defined variable // spell SQL according to the final user-defined variable
if (paramsMap == null) { if (paramsMap == null) {
@ -276,7 +269,6 @@ public class SqlTask extends AbstractTask {
} }
} }
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) { public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null; String result = null;
for (Property info :properties) { for (Property info :properties) {

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop; package org.apache.dolphinscheduler.server.worker.task.sqoop;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@ -73,12 +72,8 @@ public class SqoopTask extends AbstractYarnTask {
SqoopJobGenerator generator = new SqoopJobGenerator(); SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), // combining local and global parameters
sqoopTaskExecutionContext.getDefinedParams(), Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
sqoopParameters.getLocalParametersMap(),
sqoopParameters.getVarPoolMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
if (paramsMap != null) { if (paramsMap != null) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));

49
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java

@ -14,27 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* user define param * user define param
*/ */
@ -42,9 +36,8 @@ public class ParamsTest {
private static final Logger logger = LoggerFactory.getLogger(ParamsTest.class); private static final Logger logger = LoggerFactory.getLogger(ParamsTest.class);
@Test @Test
public void systemParamsTest()throws Exception{ public void systemParamsTest() throws Exception {
String command = "${system.biz.date}"; String command = "${system.biz.date}";
// start process // start process
@ -56,12 +49,10 @@ public class ParamsTest {
logger.info("start process : {}",command); logger.info("start process : {}",command);
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date()); calendar.setTime(new Date());
calendar.add(Calendar.DAY_OF_MONTH, -5); calendar.add(Calendar.DAY_OF_MONTH, -5);
command = "${system.biz.date}"; command = "${system.biz.date}";
// complement data // complement data
timeParams = BusinessTimeUtils timeParams = BusinessTimeUtils
@ -71,40 +62,4 @@ public class ParamsTest {
logger.info("complement data : {}",command); logger.info("complement data : {}",command);
} }
@Test
public void convertTest() throws Exception {
Map<String, Property> globalParams = new HashMap<>();
Property property = new Property();
property.setProp("global_param");
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue("${system.biz.date}");
globalParams.put("global_param", property);
Map<String, String> globalParamsMap = new HashMap<>();
globalParamsMap.put("global_param", "${system.biz.date}");
Map<String, Property> localParams = new HashMap<>();
Property localProperty = new Property();
localProperty.setProp("local_param");
localProperty.setDirect(Direct.IN);
localProperty.setType(DataType.VARCHAR);
localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty);
Map<String, Property> varPoolParams = new HashMap<>();
Property varProperty = new Property();
varProperty.setProp("local_param");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap,
localParams,varPoolParams, CommandType.START_PROCESS, new Date());
logger.info(JSONUtils.toJsonString(paramsMap));
}
} }

63
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
@ -85,7 +84,7 @@ public class ParamUtilsTest {
localParams.put("local_param", localProperty); localParams.put("local_param", localProperty);
Property varProperty = new Property(); Property varProperty = new Property();
varProperty.setProp("local_param"); varProperty.setProp("varPool");
varProperty.setDirect(Direct.IN); varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR); varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}"); varProperty.setValue("${global_param}");
@ -93,42 +92,72 @@ public class ParamUtilsTest {
} }
/** /**
* Test convert * This is basic test case for ParamUtils.convert.
* Warning:
* As you can see,this case invokes the function of convert in different situations. When you first invoke the function of convert,
* the variables of localParams and varPool in the ShellParameters will be modified. But in the whole system the variables of localParams
* and varPool have been used in other functions. I'm not sure if this current situation is wrong. So I cannot modify the original logic.
*/ */
@Test @Test
public void testConvert() { public void testConvert() {
//The expected value //The expected value
String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," String expected = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//The expected value when globalParams is null but localParams is not null //The expected value when globalParams is null but localParams is not null
String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," String expected1 = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base //Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.set(2019, 11, 30); calendar.set(2019, 11, 30);
Date date = calendar.getTime(); Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
List<Property> localParamList = localParams.values().stream().collect(Collectors.toList());
List<Property> varPoolParamList = varPoolParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
shellParameters.setLocalParams(localParamList);
String varPoolParamsJson = JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
shellParameters.setVarPool(taskExecutionContext.getVarPool());
shellParameters.dealOutParam(varPoolParamsJson);
//Invoke convert //Invoke convert
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date); Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap); String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result); assertEquals(expected, result);
for (Map.Entry<String, Property> entry : paramsMap.entrySet()) {
String key = entry.getKey();
Property prop = entry.getValue();
logger.info(key + " : " + prop.getValue());
}
//Invoke convert with null globalParams //Invoke convert with null globalParams
Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date); taskExecutionContext.setDefinedParams(null);
Map<String, Property> paramsMap1 = ParamUtils.convert(taskExecutionContext, shellParameters);
String result1 = JSONUtils.toJsonString(paramsMap1); String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1); assertEquals(expected1, result1);
//Null check, invoke convert with null globalParams and null localParams // Null check, invoke convert with null globalParams and null localParams
Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date); shellParameters.setLocalParams(null);
Map<String, Property> paramsMap2 = ParamUtils.convert(taskExecutionContext, shellParameters);
assertNull(paramsMap2); assertNull(paramsMap2);
} }

Loading…
Cancel
Save