Browse Source

Merge pull request #4237 from CalvinKirs/fix_date_convert_null

[FIX-3177]Task time parameter parsing error
pull/3/MERGE
xingchun-chen 4 years ago committed by GitHub
parent
commit
55447348ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
  2. 38
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  3. 252
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  4. 1
      pom.xml

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java

@ -66,11 +66,13 @@ public class ParameterUtils {
} }
Date cronTime; Date cronTime;
if (parameterMap != null && !parameterMap.isEmpty()) { if (parameterMap != null && !parameterMap.isEmpty()) {
// replace variable ${} form,refers to the replacement of system variables and custom variables
parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true);
}
if (parameterMap != null && null != parameterMap.get(Constants.PARAMETER_DATETIME)) {
//Get current time, schedule execute time //Get current time, schedule execute time
String cronTimeStr = parameterMap.get(Constants.PARAMETER_DATETIME); String cronTimeStr = parameterMap.get(Constants.PARAMETER_DATETIME);
cronTime = DateUtils.parse(cronTimeStr, Constants.PARAMETER_FORMAT_TIME); cronTime = DateUtils.parse(cronTimeStr, Constants.PARAMETER_FORMAT_TIME);
// replace variable ${} form,refers to the replacement of system variables and custom variables
parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true);
} else { } else {
cronTime = new Date(); cronTime = new Date();
} }

38
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -17,11 +17,14 @@
package org.apache.dolphinscheduler.server.worker.task.shell; package org.apache.dolphinscheduler.server.worker.task.shell;
import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
@ -38,6 +41,8 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -129,14 +134,7 @@ public class ShellTask extends AbstractTask {
} }
String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n"); String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
// combining local and global parameters script = parseScript(script);
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
shellParameters.setRawScript(script); shellParameters.setRawScript(script);
logger.info("raw script : {}", shellParameters.getRawScript()); logger.info("raw script : {}", shellParameters.getRawScript());
@ -161,4 +159,28 @@ public class ShellTask extends AbstractTask {
return shellParameters; return shellParameters;
} }
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {
paramsMap = new HashMap<>();
}
Date date = taskExecutionContext.getScheduleTime();
if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
}
String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_DATETIME);
paramsMap.put(Constants.PARAMETER_DATETIME, p);
}
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
} }

252
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -14,227 +14,101 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task.shell; package org.apache.dolphinscheduler.server.worker.task.shell;
import org.apache.dolphinscheduler.common.enums.CommandType; import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files;
import org.apache.dolphinscheduler.service.process.ProcessService; import java.nio.file.Paths;
import org.junit.*; import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito; import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.Date;
/** /**
* shell task test * shell task test.
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest(OSUtils.class) @PrepareForTest({ShellTask.class})
@PowerMockIgnore({"javax.management.*"})
public class ShellTaskTest { public class ShellTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
private ShellTask shellTask; private ShellTask shellTask;
private ProcessService processService;
private ShellCommandExecutor shellCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
private ApplicationContext applicationContext;
private TaskExecutionContext taskExecutionContext; private TaskExecutionContext taskExecutionContext;
private CommandExecuteResult commandExecuteResult;
@Before @Before
public void before() throws Exception { public void before() throws Exception {
taskExecutionContext = new TaskExecutionContext(); System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
PowerMockito.mockStatic(OSUtils.class);
processService = PowerMockito.mock(ProcessService.class);
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
applicationContext = PowerMockito.mock(ApplicationContext.class); taskExecutionContext = new TaskExecutionContext();
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); taskExecutionContext.setTaskInstanceId(1);
springApplicationContext.setApplicationContext(applicationContext); taskExecutionContext.setTaskName("kris test");
PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setHost("127.0.0.1:1234");
TaskProps props = new TaskProps(); taskExecutionContext.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); taskExecutionContext.setLogPath("/log");
props.setTenantCode("1"); taskExecutionContext.setTaskJson(
props.setEnvFile(".dolphinscheduler_env.sh"); "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
props.setTaskStartTime(new Date()); +
props.setTaskTimeout(0); "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}"); + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
shellTask = new ShellTask(taskExecutionContext, logger); + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
shellTask.init(); + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); taskExecutionContext.setProcessInstanceId(1);
PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(5);
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), taskExecutionContext.setTenantCode("roo");
props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); taskExecutionContext.setScheduleTime(new Date());
PowerMockito.when(shellCommandExecutor.run("")).thenReturn(null); taskExecutionContext.setQueue("default");
} taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
private DataSource getDataSource() { +
DataSource dataSource = new DataSource(); "[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
dataSource.setType(DbType.MYSQL); + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
dataSource.setConnectionParams( Map<String, String> definedParams = new HashMap<>();
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); definedParams.put("time_gb", "2020-12-16 00:00:00");
dataSource.setUserId(1); taskExecutionContext.setDefinedParams(definedParams);
return dataSource; PowerMockito.mockStatic(Files.class);
} PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true);
commandExecuteResult = new CommandExecuteResult();
private ProcessInstance getProcessInstance() { commandExecuteResult.setAppIds("appId");
ProcessInstance processInstance = new ProcessInstance(); commandExecuteResult.setExitStatusCode(0);
processInstance.setCommandType(CommandType.START_PROCESS); commandExecuteResult.setProcessId(1);
processInstance.setScheduleTime(new Date());
return processInstance;
}
@After
public void after() {}
/**
* Method: ShellTask()
*/
@Test
public void testShellTask()
throws Exception {
TaskProps props = new TaskProps();
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTenantCode("1");
ShellTask shellTaskTest = new ShellTask(taskExecutionContext, logger);
Assert.assertNotNull(shellTaskTest);
}
/**
* Method: init for Unix-like
*/
@Test
public void testInitForUnix() {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
shellTask.init();
Assert.assertTrue(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
} }
@Test @Test
public void testInitException() { public void testComplementData() throws Exception {
TaskProps props = new TaskProps(); shellTask = new ShellTask(taskExecutionContext, logger);
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"rawScript\": \"\"}");
ShellTask shellTask = new ShellTask(taskExecutionContext, logger);
try {
shellTask.init(); shellTask.init();
} catch (Exception e) { PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
logger.info(e.getMessage(), e); shellTask.handle();
if (e.getMessage().contains("shell task params is not valid")) {
Assert.assertTrue(true);
}
}
} }
/**
* Method: init for Windows
*/
@Test @Test
public void testInitForWindows() { public void testStartProcess() throws Exception {
try { taskExecutionContext.setCmdTypeIfComplement(0);
PowerMockito.when(OSUtils.isWindows()).thenReturn(true); shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init(); shellTask.init();
Assert.assertTrue(true); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
} catch (Error | Exception e) { shellTask.handle();
logger.error(e.getMessage());
}
}
/**
* Method: handle() for Unix-like
*/
@Test
public void testHandleForUnix() throws Exception {
try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
TaskProps props = new TaskProps();
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger);
shellTask1.init();
shellTask1.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")
&& !System.getProperty("os.name").startsWith("Windows")) {
logger.error(e.getMessage());
} }
}
}
/**
* Method: handle() for Windows
*/
@Test
public void testHandleForWindows() throws Exception {
try {
Assume.assumeTrue(OSUtils.isWindows());
TaskProps props = new TaskProps();
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger);
shellTask1.init();
shellTask1.handle();
Assert.assertTrue(true);
} catch (Error | Exception e) {
if (!e.getMessage().contains("process error . exitCode is : -1")) {
logger.error(e.getMessage());
}
}
}
/**
* Method: cancelApplication()
*/
@Test
public void testCancelApplication() throws Exception {
try {
shellTask.cancelApplication(true);
Assert.assertTrue(true);
} catch (Error | Exception e) {
logger.error(e.getMessage());
}
}
} }

1
pom.xml

@ -853,6 +853,7 @@
<!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>--> <!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>-->
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>--> <!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include> <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include> <include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include> <include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>

Loading…
Cancel
Save