Browse Source

[Improvement-5773][server] need to support two parameters related to task (#5774)

* add some new parameter for task

* restore official properties

* improve imports

* modify a variable's name

Co-authored-by: jiang hua <jiang.hua@zhaopin.com.cn>
2.0.7-release
Hua Jiang 3 years ago committed by GitHub
parent
commit
9fd5145b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  3. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  4. 67
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
  5. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
  6. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  7. 87
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
  8. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -660,6 +660,16 @@ public final class Constants {
*/
public static final String PARAMETER_BUSINESS_DATE = "system.biz.date";
/**
* the absolute path of current executing task
*/
public static final String PARAMETER_TASK_EXECUTE_PATH = "system.task.execute.path";
/**
* the instance id of current task
*/
public static final String PARAMETER_TASK_INSTANCE_ID = "system.task.instance.id";
/**
* ACCEPTED
*/

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -31,12 +31,12 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -144,4 +144,5 @@ public class LoggerUtils {
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}

67
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java

@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.logging.log4j.util.Strings;
import java.util.Date;
import java.util.HashMap;
@ -95,6 +101,67 @@ public class ParamUtils {
return globalParams;
}
/**
* parameter conversion
* @param taskExecutionContext the context of this task instance
* @param parameters the parameters
* @return global params
*/
public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext);
Preconditions.checkNotNull(parameters);
Map<String,Property> globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams());
Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
Map<String,Property> localParams = parameters.getLocalParametersMap();
if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String,String> params = BusinessTimeUtils
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
params.putAll(globalParamsMap);
}
if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) {
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
}
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
}
return globalParams;
}
/**
* format convert
* @param paramsMap params map

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
@ -29,6 +30,8 @@ import java.nio.file.Paths;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
/**
* shell command executor
*/
@ -78,7 +81,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
return OSUtils.isWindows() ? CMD : SH;
}
/**
* create command file if not exists
* @param execCommand exec command
@ -117,6 +119,4 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
}
}
}

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -35,8 +35,6 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@ -51,6 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
/**
* shell task
*/
@ -101,7 +101,8 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
String command = buildCommand();
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
@ -165,12 +166,8 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(),
shellParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters);
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {

87
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java

@ -20,22 +20,30 @@ package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Test ParamUtils
*/
@ -82,7 +90,6 @@ public class ParamUtilsTest {
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
}
/**
@ -90,7 +97,6 @@ public class ParamUtilsTest {
*/
@Test
public void testConvert() {
//The expected value
String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
@ -126,6 +132,83 @@ public class ParamUtilsTest {
assertNull(paramsMap2);
}
/**
* Test some new params related to task
*/
@Test
public void testConvertForParamsRelatedTask() throws Exception {
// start to form some test data for new paramters
Map<String,Property> globalParams = new HashMap<>();
Map<String,String> globalParamsMap = new HashMap<>();
Property taskInstanceIdProperty = new Property();
String propName = "task_execution_id";
String paramValue = String.format("${%s}", Constants.PARAMETER_TASK_INSTANCE_ID);
taskInstanceIdProperty.setProp(propName);
taskInstanceIdProperty.setDirect(Direct.IN);
taskInstanceIdProperty.setType(DataType.VARCHAR);
taskInstanceIdProperty.setValue(paramValue);
globalParams.put(propName,taskInstanceIdProperty);
globalParamsMap.put(propName,paramValue);
Property taskExecutionPathProperty = new Property();
propName = "task_execution_path";
paramValue = String.format("${%s}", Constants.PARAMETER_TASK_EXECUTE_PATH);
taskExecutionPathProperty.setProp(propName);
taskExecutionPathProperty.setDirect(Direct.IN);
taskExecutionPathProperty.setType(DataType.VARCHAR);
taskExecutionPathProperty.setValue(paramValue);
globalParams.put(propName,taskExecutionPathProperty);
globalParamsMap.put(propName,paramValue);
Calendar calendar = Calendar.getInstance();
calendar.set(2019,11,30);
Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
+ "{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
//The expected value
String expected = "{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
+ "\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
//The expected value when globalParams is null but localParams is not null
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap);
Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
Map<String,String> expectedMap = JSONUtils.parseObject(expected,Map.class);
result = JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
expected = JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
assertEquals(expected, result);
}
/**
* Test the overload method of convert
*/

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java

@ -14,17 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.shell;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
package org.apache.dolphinscheduler.server.worker.shell;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -32,6 +27,13 @@ import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@ -45,9 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.Date;
import java.util.List;
/**
* python shell command executor test
*/
@ -84,21 +83,21 @@ public class ShellCommandExecutorTest {
taskProps.setTaskTimeout(360000);
taskProps.setTaskInstanceId(7657);
TaskInstance taskInstance = processService.findTaskInstanceById(7657);
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
// custom logger
// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
// taskInstance.getProcessDefinitionId(),
// taskInstance.getProcessInstanceId(),
// taskInstance.getId()));
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
// TaskInstance taskInstance = processService.findTaskInstanceById(7657);
//
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
//
//
// // custom logger
// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
// taskInstance.getProcessDefine().getCode(),
// taskInstance.getProcessDefine().getVersion(),
// taskInstance.getProcessInstanceId(),
// taskInstance.getId()));
//
//
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractTask task = null;

Loading…
Cancel
Save