diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 7a47107249..f50b6383b8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; @@ -221,6 +222,19 @@ public class TaskExecutionContext implements Serializable { */ private String varPool; + /** + * business param + */ + private Map paramsMap; + + public Map getParamsMap() { + return paramsMap; + } + + public void setParamsMap(Map paramsMap) { + this.paramsMap = paramsMap; + } + /** * procedure TaskExecutionContext */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 50847f7e13..73a66384be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,6 +17,10 @@ package org.apache.dolphinscheduler.server.worker.runner; +import static java.util.Calendar.DAY_OF_MONTH; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -153,6 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed { task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); // task init task.init(); + preBuildBusinessParams(); //init varPool task.getParameters().setVarPool(taskExecutionContext.getVarPool()); // task handle @@ -182,6 +187,23 @@ public class TaskExecuteThread implements Runnable, Delayed { } } + private void preBuildBusinessParams() { + Map paramsMap = new HashMap<>(); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (taskExecutionContext.getScheduleTime() != null) { + Date date = taskExecutionContext.getScheduleTime(); + if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { + date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); + } + String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_DATETIME); + paramsMap.put(Constants.PARAMETER_DATETIME, p); + } + taskExecutionContext.setParamsMap(paramsMap); + } + /** * when task finish, clear execute path. */ @@ -227,7 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed { return globalParamsMap; } - /** * kill task */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index c30326d03e..aa4e2ceb30 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import java.io.File; @@ -56,6 +57,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -155,6 +157,12 @@ public class DataxTask extends AbstractTask { // replace placeholder,and combine local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } // run datax procesDataSourceService.s String jsonFilePath = buildDataxJsonFile(paramsMap); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 863b91aaf7..928edc5096 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.commons.collections.MapUtils; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,12 +84,16 @@ public class FlinkTask extends AbstractYarnTask { // combining local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } logger.info("param Map : {}", paramsMap); - if (paramsMap != null) { - args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); - logger.info("param args : {}", args); - } + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + logger.info("param args : {}", args); flinkParameters.setMainArgs(args); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 4e34741577..2c9ccc45f3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.Charsets; import org.apache.http.HttpEntity; import org.apache.http.ParseException; @@ -49,6 +50,7 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,6 +140,12 @@ public class HttpTask extends AbstractTask { // replace placeholder,and combine local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } List httpPropertyList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 5e8f3ca932..c7fdba46ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.commons.collections.MapUtils; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -85,14 +88,18 @@ public class MapReduceTask extends AbstractYarnTask { // replace placeholder,and combine local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } - if (paramsMap != null) { - String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); - mapreduceParameters.setMainArgs(args); - if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) { - String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap)); - mapreduceParameters.setOthers(others); - } + String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); + mapreduceParameters.setMainArgs(args); + if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) { + String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap)); + mapreduceParameters.setOthers(others); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 0ee480d7df..5ffa6cadb6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; +import org.apache.commons.collections.MapUtils; + +import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; @@ -118,6 +121,12 @@ public class PythonTask extends AbstractTask { // combining local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } try { rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); @@ -125,10 +134,8 @@ public class PythonTask extends AbstractTask { catch (StringIndexOutOfBoundsException e) { logger.error("setShareVar field format error, raw python script : {}", rawPythonScript); } - - if (paramsMap != null) { - rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); - } + + rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); logger.info("raw python script : {}", pythonParameters.getRawScript()); logger.info("task dir : {}", taskDir); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 32c2ad18fe..31b7447cb2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -17,14 +17,10 @@ package org.apache.dolphinscheduler.server.worker.task.shell; -import static java.util.Calendar.DAY_OF_MONTH; - import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; @@ -34,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.commons.collections.MapUtils; + import java.io.File; import java.nio.file.Files; import java.nio.file.Path; @@ -41,7 +39,6 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -164,21 +161,11 @@ public class ShellTask extends AbstractTask { private String parseScript(String script) { // combining local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - - // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job - if (taskExecutionContext.getScheduleTime() != null) { - if (paramsMap == null) { - paramsMap = new HashMap<>(); - } - Date date = taskExecutionContext.getScheduleTime(); - if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { - date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); - } - String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); - Property p = new Property(); - p.setValue(dateTime); - p.setProp(Constants.PARAMETER_DATETIME); - paramsMap.put(Constants.PARAMETER_DATETIME, p); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); } return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 6939439ef6..64c60b0b94 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.commons.collections.MapUtils; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -110,12 +113,14 @@ public class SparkTask extends AbstractYarnTask { // replace placeholder, and combining local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - - String command = null; - - if (null != paramsMap) { - command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } + + String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); logger.info("spark task command: {}", command); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 3c4b3ab273..ee2265c43b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -169,9 +169,15 @@ public class SqlTask extends AbstractTask { // combining local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } // spell SQL according to the final user-defined variable - if (paramsMap == null) { + if (MapUtils.isEmpty(paramsMap)) { sqlBuilder.append(sql); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 2f3e48dc4c..ce91199bd6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; +import org.apache.commons.collections.MapUtils; + +import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; @@ -74,8 +77,14 @@ public class SqoopTask extends AbstractYarnTask { // combining local and global parameters Map paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters()); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) { + paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap()); + } - if (paramsMap != null) { + if (MapUtils.isNotEmpty(paramsMap)) { String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); logger.info("sqoop script: {}", resultScripts); return resultScripts;