diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index 2a08734aab..034217ef82 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -66,11 +66,13 @@ public class ParameterUtils { } Date cronTime; if (parameterMap != null && !parameterMap.isEmpty()) { + // replace variable ${} form,refers to the replacement of system variables and custom variables + parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true); + } + if (parameterMap != null && null != parameterMap.get(Constants.PARAMETER_DATETIME)) { //Get current time, schedule execute time String cronTimeStr = parameterMap.get(Constants.PARAMETER_DATETIME); cronTime = DateUtils.parse(cronTimeStr, Constants.PARAMETER_FORMAT_TIME); - // replace variable ${} form,refers to the replacement of system variables and custom variables - parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true); } else { cronTime = new Date(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 835fe40028..5cbd3c151f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -17,11 +17,14 @@ package org.apache.dolphinscheduler.server.worker.task.shell; +import static java.util.Calendar.DAY_OF_MONTH; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; @@ -38,6 +41,8 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; +import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -129,14 +134,7 @@ public class ShellTask extends AbstractTask { } String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n"); - // combining local and global parameters - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - shellParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job - script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + script = parseScript(script); shellParameters.setRawScript(script); logger.info("raw script : {}", shellParameters.getRawScript()); @@ -161,4 +159,28 @@ public class ShellTask extends AbstractTask { return shellParameters; } + private String parseScript(String script) { + // combining local and global parameters + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + shellParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (taskExecutionContext.getScheduleTime() != null) { + if (paramsMap == null) { + paramsMap = new HashMap<>(); + } + Date date = taskExecutionContext.getScheduleTime(); + if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { + date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); + } + String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_DATETIME); + paramsMap.put(Constants.PARAMETER_DATETIME, p); + } + return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index c30f33c683..c5f2de82ea 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -14,227 +14,101 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.shell; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import static org.mockito.ArgumentMatchers.anyString; + import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.junit.*; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.ApplicationContext; - -import java.util.Date; /** - * shell task test + * shell task test. */ @RunWith(PowerMockRunner.class) -@PrepareForTest(OSUtils.class) -@PowerMockIgnore({"javax.management.*"}) +@PrepareForTest({ShellTask.class}) public class ShellTaskTest { private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); private ShellTask shellTask; - - private ProcessService processService; - private ShellCommandExecutor shellCommandExecutor; - - private ApplicationContext applicationContext; private TaskExecutionContext taskExecutionContext; + private CommandExecuteResult commandExecuteResult; @Before public void before() throws Exception { - taskExecutionContext = new TaskExecutionContext(); - - PowerMockito.mockStatic(OSUtils.class); - processService = PowerMockito.mock(ProcessService.class); + System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); - - applicationContext = PowerMockito.mock(ApplicationContext.class); - SpringApplicationContext springApplicationContext = new SpringApplicationContext(); - springApplicationContext.setApplicationContext(applicationContext); - PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}"); - shellTask = new ShellTask(taskExecutionContext, logger); - shellTask.init(); - - PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); - PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); - PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); - - String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), - props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); - PowerMockito.when(shellCommandExecutor.run("")).thenReturn(null); - } - - private DataSource getDataSource() { - DataSource dataSource = new DataSource(); - dataSource.setType(DbType.MYSQL); - dataSource.setConnectionParams( - "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); - dataSource.setUserId(1); - return dataSource; - } - - private ProcessInstance getProcessInstance() { - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setCommandType(CommandType.START_PROCESS); - processInstance.setScheduleTime(new Date()); - return processInstance; - } - - @After - public void after() {} - - /** - * Method: ShellTask() - */ - @Test - public void testShellTask() - throws Exception { - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - ShellTask shellTaskTest = new ShellTask(taskExecutionContext, logger); - Assert.assertNotNull(shellTaskTest); - } - - /** - * Method: init for Unix-like - */ - @Test - public void testInitForUnix() { - try { - PowerMockito.when(OSUtils.isWindows()).thenReturn(false); - shellTask.init(); - Assert.assertTrue(true); - } catch (Error | Exception e) { - logger.error(e.getMessage()); - } - } - - @Test - public void testInitException() { - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"rawScript\": \"\"}"); - ShellTask shellTask = new ShellTask(taskExecutionContext, logger); - try { - shellTask.init(); - } catch (Exception e) { - logger.info(e.getMessage(), e); - if (e.getMessage().contains("shell task params is not valid")) { - Assert.assertTrue(true); - } - } - } - - /** - * Method: init for Windows - */ - @Test - public void testInitForWindows() { - try { - PowerMockito.when(OSUtils.isWindows()).thenReturn(true); - shellTask.init(); - Assert.assertTrue(true); - } catch (Error | Exception e) { - logger.error(e.getMessage()); - } - } - - /** - * Method: handle() for Unix-like - */ - @Test - public void testHandleForUnix() throws Exception { - try { - PowerMockito.when(OSUtils.isWindows()).thenReturn(false); - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setScheduleTime(new Date()); - props.setCmdTypeIfComplement(CommandType.START_PROCESS); - props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}"); - ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger); - shellTask1.init(); - shellTask1.handle(); - Assert.assertTrue(true); - } catch (Error | Exception e) { - if (!e.getMessage().contains("process error . exitCode is : -1") - && !System.getProperty("os.name").startsWith("Windows")) { - logger.error(e.getMessage()); - } - } + PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("kris test"); + taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setTaskJson( + "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + + + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]"); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(5); + taskExecutionContext.setTenantCode("roo"); + taskExecutionContext.setScheduleTime(new Date()); + taskExecutionContext.setQueue("default"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + + + "[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + Map definedParams = new HashMap<>(); + definedParams.put("time_gb", "2020-12-16 00:00:00"); + taskExecutionContext.setDefinedParams(definedParams); + PowerMockito.mockStatic(Files.class); + PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true); + commandExecuteResult = new CommandExecuteResult(); + commandExecuteResult.setAppIds("appId"); + commandExecuteResult.setExitStatusCode(0); + commandExecuteResult.setProcessId(1); } - /** - * Method: handle() for Windows - */ @Test - public void testHandleForWindows() throws Exception { - try { - Assume.assumeTrue(OSUtils.isWindows()); - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setScheduleTime(new Date()); - props.setCmdTypeIfComplement(CommandType.START_PROCESS); - props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}"); - ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger); - shellTask1.init(); - shellTask1.handle(); - Assert.assertTrue(true); - } catch (Error | Exception e) { - if (!e.getMessage().contains("process error . exitCode is : -1")) { - logger.error(e.getMessage()); - } - } + public void testComplementData() throws Exception { + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); + shellTask.handle(); } - /** - * Method: cancelApplication() - */ @Test - public void testCancelApplication() throws Exception { - try { - shellTask.cancelApplication(true); - Assert.assertTrue(true); - } catch (Error | Exception e) { - logger.error(e.getMessage()); - } + public void testStartProcess() throws Exception { + taskExecutionContext.setCmdTypeIfComplement(0); + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); + shellTask.handle(); } - } diff --git a/pom.xml b/pom.xml index 4eda05354a..90a58a1361 100644 --- a/pom.xml +++ b/pom.xml @@ -853,6 +853,7 @@ **/server/worker/task/sqoop/SqoopTaskTest.java + **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/TaskManagerTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java