diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 39f8898254..225dc2367d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -660,6 +660,16 @@ public final class Constants { */ public static final String PARAMETER_BUSINESS_DATE = "system.biz.date"; + /** + * the absolute path of current executing task + */ + public static final String PARAMETER_TASK_EXECUTE_PATH = "system.task.execute.path"; + + /** + * the instance id of current task + */ + public static final String PARAMETER_TASK_INSTANCE_ID = "system.task.instance.id"; + /** * ACCEPTED */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index 9e929e36bf..2a1e2ac0b0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -31,12 +31,12 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import com.fasterxml.jackson.core.JsonProcessingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 904ca97c98..545bbb2274 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -144,4 +144,5 @@ public class LoggerUtils { , String info) { optionalLogger.ifPresent((Logger logger) -> logger.info(info)); } + } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index a49d915ff9..cbf663fce2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -17,13 +17,19 @@ package org.apache.dolphinscheduler.server.utils; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; + +import org.apache.logging.log4j.util.Strings; import java.util.Date; import java.util.HashMap; @@ -95,6 +101,67 @@ public class ParamUtils { return globalParams; } + /** + * parameter conversion + * @param taskExecutionContext the context of this task instance + * @param parameters the parameters + * @return global params + */ + public static Map convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { + Preconditions.checkNotNull(taskExecutionContext); + Preconditions.checkNotNull(parameters); + Map globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams()); + Map globalParamsMap = taskExecutionContext.getDefinedParams(); + CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); + Date scheduleTime = taskExecutionContext.getScheduleTime(); + + Map localParams = parameters.getLocalParametersMap(); + + if (globalParams == null && localParams == null) { + return null; + } + // if it is a complement, + // you need to pass in the task instance id to locate the time + // of the process instance complement + Map params = BusinessTimeUtils + .getBusinessTime(commandType, + scheduleTime); + + if (globalParamsMap != null) { + params.putAll(globalParamsMap); + } + + if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) { + params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath()); + } + params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId())); + + if (globalParams != null && localParams != null) { + globalParams.putAll(localParams); + } else if (globalParams == null && localParams != null) { + globalParams = localParams; + } + Iterator> iter = globalParams.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry en = iter.next(); + Property property = en.getValue(); + + if (StringUtils.isNotEmpty(property.getValue()) + && property.getValue().startsWith("$")) { + /** + * local parameter refers to global parameter with the same name + * note: the global parameters of the process instance here are solidified parameters, + * and there are no variables in them. + */ + String val = property.getValue(); + val = ParameterUtils.convertParameterPlaceholders(val, params); + property.setValue(val); + } + } + + return globalParams; + } + /** * format convert * @param paramsMap params map diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 8f3da4537d..b547ef9817 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task; -import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.slf4j.Logger; + +import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; @@ -29,6 +30,8 @@ import java.nio.file.Paths; import java.util.List; import java.util.function.Consumer; +import org.slf4j.Logger; + /** * shell command executor */ @@ -78,7 +81,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { return OSUtils.isWindows() ? CMD : SH; } - /** * create command file if not exists * @param execCommand exec command @@ -117,6 +119,4 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); } } - - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index e193571ce0..f7887df41e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -35,8 +35,6 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.slf4j.Logger; - import java.io.File; import java.nio.file.Files; import java.nio.file.Path; @@ -51,6 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; + /** * shell task */ @@ -101,7 +101,8 @@ public class ShellTask extends AbstractTask { public void handle() throws Exception { try { // construct process - CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand()); + String command = buildCommand(); + CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(command); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); @@ -165,12 +166,8 @@ public class ShellTask extends AbstractTask { private String parseScript(String script) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - shellParameters.getLocalParametersMap(), - shellParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + Map paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job if (taskExecutionContext.getScheduleTime() != null) { if (paramsMap == null) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java index a9a1b89371..99a6eb2111 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java @@ -20,22 +20,30 @@ package org.apache.dolphinscheduler.server.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import java.util.Calendar; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.SerializationFeature; + /** * Test ParamUtils */ @@ -82,7 +90,6 @@ public class ParamUtilsTest { varProperty.setType(DataType.VARCHAR); varProperty.setValue("${global_param}"); varPoolParams.put("varPool", varProperty); - } /** @@ -90,7 +97,6 @@ public class ParamUtilsTest { */ @Test public void testConvert() { - //The expected value String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," @@ -126,6 +132,83 @@ public class ParamUtilsTest { assertNull(paramsMap2); } + /** + * Test some new params related to task + */ + @Test + public void testConvertForParamsRelatedTask() throws Exception { + // start to form some test data for new paramters + Map globalParams = new HashMap<>(); + Map globalParamsMap = new HashMap<>(); + + Property taskInstanceIdProperty = new Property(); + String propName = "task_execution_id"; + String paramValue = String.format("${%s}", Constants.PARAMETER_TASK_INSTANCE_ID); + taskInstanceIdProperty.setProp(propName); + taskInstanceIdProperty.setDirect(Direct.IN); + taskInstanceIdProperty.setType(DataType.VARCHAR); + taskInstanceIdProperty.setValue(paramValue); + globalParams.put(propName,taskInstanceIdProperty); + globalParamsMap.put(propName,paramValue); + + Property taskExecutionPathProperty = new Property(); + propName = "task_execution_path"; + paramValue = String.format("${%s}", Constants.PARAMETER_TASK_EXECUTE_PATH); + taskExecutionPathProperty.setProp(propName); + taskExecutionPathProperty.setDirect(Direct.IN); + taskExecutionPathProperty.setType(DataType.VARCHAR); + taskExecutionPathProperty.setValue(paramValue); + + globalParams.put(propName,taskExecutionPathProperty); + globalParamsMap.put(propName,paramValue); + + Calendar calendar = Calendar.getInstance(); + calendar.set(2019,11,30); + Date date = calendar.getTime(); + + List globalParamList = globalParams.values().stream().collect(Collectors.toList()); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("params test"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp/test"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(0); + taskExecutionContext.setScheduleTime(date); + taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList)); + taskExecutionContext.setDefinedParams(globalParamsMap); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\"," + + "\"localParams\":" + + "[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"}," + + "{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}"); + + ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class); + + //The expected value + String expected = "{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"}," + + "\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}"; + + //The expected value when globalParams is null but localParams is not null + Map paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters); + + String result = JSONUtils.toJsonString(paramsMap); + + Map resultMap = JSONUtils.parseObject(result,Map.class); + Map expectedMap = JSONUtils.parseObject(expected,Map.class); + + result = JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + expected = JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + + assertEquals(expected, result); + + } + /** * Test the overload method of convert */ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index e224fa81c3..b4dfb0e869 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -14,17 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.shell; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Method; +package org.apache.dolphinscheduler.server.worker.shell; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -32,6 +27,13 @@ import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -45,9 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.Date; -import java.util.List; - /** * python shell command executor test */ @@ -84,21 +83,21 @@ public class ShellCommandExecutorTest { taskProps.setTaskTimeout(360000); taskProps.setTaskInstanceId(7657); - - TaskInstance taskInstance = processService.findTaskInstanceById(7657); - -// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); -// taskProps.setTaskParams(taskNode.getParams()); - - - // custom logger -// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, -// taskInstance.getProcessDefinitionId(), -// taskInstance.getProcessInstanceId(), -// taskInstance.getId())); - - -// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); + // TaskInstance taskInstance = processService.findTaskInstanceById(7657); + // + // TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); + // taskProps.setTaskParams(taskNode.getParams()); + // + // + // // custom logger + // Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + // taskInstance.getProcessDefine().getCode(), + // taskInstance.getProcessDefine().getVersion(), + // taskInstance.getProcessInstanceId(), + // taskInstance.getId())); + // + // + // AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); AbstractTask task = null;