diff --git a/dolphinscheduler-dao/src/main/resources/datasource.properties b/dolphinscheduler-dao/src/main/resources/datasource.properties index 535b7493ce..0deb7fe00b 100644 --- a/dolphinscheduler-dao/src/main/resources/datasource.properties +++ b/dolphinscheduler-dao/src/main/resources/datasource.properties @@ -66,4 +66,4 @@ spring.datasource.password=test # open PSCache, specify count PSCache for every connection #spring.datasource.poolPreparedStatements=true -#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 +#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 721656730d..13c96c62a9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -174,6 +174,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); responseCommand.setVarPool(task.getVarPool()); + responseCommand.setResult(task.getResultString()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 443bd319ed..52f7363c66 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -54,6 +54,7 @@ import java.util.regex.Pattern; import org.slf4j.Logger; + /** * abstract command executor */ @@ -84,6 +85,11 @@ public abstract class AbstractCommandExecutor { */ protected final List logBuffer; + /** + * SHELL result string + */ + protected String resultString; + /** * taskExecutionContext */ @@ -223,6 +229,14 @@ public abstract class AbstractCommandExecutor { return varPool.toString(); } + public String getResultString() { + return resultString; + } + + public void setResultString(String result) { + this.resultString = result; + } + /** * cancel application * @@ -355,6 +369,7 @@ public abstract class AbstractCommandExecutor { varPool.append("$VarPool$"); } else { logBuffer.add(line); + resultString = line; lastFlushTime = flush(lastFlushTime); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index de7d35f404..68152e269f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -63,6 +63,11 @@ public abstract class AbstractTask { */ protected int processId; + /** + * SHELL result string + */ + protected String resultString; + /** * other resource manager appId , for example : YARN etc */ @@ -167,6 +172,14 @@ public abstract class AbstractTask { this.processId = processId; } + public String getResultString() { + return resultString; + } + + public void setResultString(String resultString) { + this.resultString = resultString; + } + /** * get task parameters * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 5cbd3c151f..5e61f89131 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -21,6 +21,7 @@ import static java.util.Calendar.DAY_OF_MONTH; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -34,6 +35,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.slf4j.Logger; + import java.io.File; import java.nio.file.Files; import java.nio.file.Path; @@ -41,13 +44,13 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import org.slf4j.Logger; - /** * shell task */ @@ -102,6 +105,7 @@ public class ShellTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); + setResult(shellCommandExecutor.getResultString()); } catch (Exception e) { logger.error("shell task error", e); setExitStatusCode(Constants.EXIT_CODE_FAILURE); @@ -183,4 +187,17 @@ public class ShellTask extends AbstractTask { } return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } + + public void setResult(String result) { + Map localParams = shellParameters.getLocalParametersMap(); + List> outProperties = new ArrayList<>(); + Map p = new HashMap<>(); + localParams.forEach((k,v) -> { + if (v.getDirect() == Direct.OUT) { + p.put(k, result); + } + }); + outProperties.add(p); + resultString = JSONUtils.toJsonString(outProperties); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 4c328edbe0..cb56c0b4f5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -148,7 +149,7 @@ public class SqlTask extends AbstractTask { logger); // execute sql task - executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams()); setExitStatusCode(Constants.EXIT_CODE_SUCCESS); @@ -237,7 +238,8 @@ public class SqlTask extends AbstractTask { public void executeFuncAndSql(SqlBinds mainSqlBinds, List preStatementsBinds, List postStatementsBinds, - List createFuncs) { + List createFuncs, + List properties) { Connection connection = null; PreparedStatement stmt = null; ResultSet resultSet = null; @@ -253,18 +255,21 @@ public class SqlTask extends AbstractTask { preSql(connection, preStatementsBinds); stmt = prepareStatementAndBind(connection, mainSqlBinds); + String result = null; // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send resultSet = stmt.executeQuery(); - resultProcess(resultSet); + result = resultProcess(resultSet); } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { // non query statement - stmt.executeUpdate(); + String updateResult = String.valueOf(stmt.executeUpdate()); + result = setNonQuerySqlReturn(updateResult, properties); } postSql(connection, postStatementsBinds); + this.setResultString(result); } catch (Exception e) { logger.error("execute sql error", e); @@ -274,13 +279,28 @@ public class SqlTask extends AbstractTask { } } + public String setNonQuerySqlReturn(String updateResult, List properties) { + String result = null; + for (Property info :properties) { + if (Direct.OUT == info.getDirect()) { + List> updateRL = new ArrayList<>(); + Map updateRM = new HashMap<>(); + updateRM.put(info.getProp(),updateResult); + updateRL.add(updateRM); + result = JSONUtils.toJsonString(updateRL); + break; + } + } + return result; + } + /** * result process * * @param resultSet resultSet * @throws Exception Exception */ - private void resultProcess(ResultSet resultSet) throws Exception { + private String resultProcess(ResultSet resultSet) throws Exception { ArrayNode resultJSONArray = JSONUtils.createArrayNode(); ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); @@ -300,6 +320,7 @@ public class SqlTask extends AbstractTask { sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", JSONUtils.toJsonString(resultJSONArray)); + return result; } /** diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5a5561d1bd..c4e9e9f3f9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -119,6 +119,11 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + String result = responseCommand.getResult(); + responseCommand.setResult("return string"); + taskCallbackService.sendResult(1, responseCommand.convert2Command()); + Stopper.stop(); nettyRemotingServer.close(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index 6acfd180c2..24ed5b956d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -17,13 +17,22 @@ package org.apache.dolphinscheduler.server.worker.task; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; +import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.junit.Assert; import org.junit.Before; @@ -116,4 +125,70 @@ public class TaskManagerTest { taskExecutionContext.setTaskType("XXX"); TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); } + + @Test + public void testShellTaskReturnString() { + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("kris test"); + taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setTaskJson( + "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":" + + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"}," + + "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]"); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(5); + taskExecutionContext.setTenantCode("roo"); + taskExecutionContext.setScheduleTime(new Date()); + taskExecutionContext.setQueue("default"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + + + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + Map definedParams = new HashMap<>(); + definedParams.put("time_gb", "2020-12-16 00:00:00"); + taskExecutionContext.setDefinedParams(definedParams); + ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); + shellTask.setResultString("shell return"); + String shellReturn = shellTask.getResultString(); + shellTask.init(); + shellTask.setResult(shellReturn); + Assert.assertSame(shellReturn, "shell return"); + } + + @Test + public void testSqlTaskReturnString() { + String params = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\"," + + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskParams("{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}]," + + "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\"," + + "\"sqlType\":1}"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setTaskAppId("1"); + taskExecutionContext.setTenantCode("root"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(10000); + taskExecutionContext.setLogPath("/tmp/dx"); + + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); + sqlTaskExecutionContext.setConnectionParams(params); + taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext); + SqlTask sqlTask = new SqlTask(taskExecutionContext, logger, null); + SqlParameters sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); + List properties = sqlParameters.getLocalParams(); + sqlTask.setNonQuerySqlReturn("sql return", properties); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java new file mode 100644 index 0000000000..1af0a384f0 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task.shell; + +import static org.mockito.ArgumentMatchers.anyString; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * shell task return test. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ShellTask.class}) +public class ShellTaskReturnTest { + private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); + + private ShellTask shellTask; + private ShellCommandExecutor shellCommandExecutor; + private TaskExecutionContext taskExecutionContext; + private CommandExecuteResult commandExecuteResult; + + @Before + public void before() throws Exception { + System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); + shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); + PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("kris test"); + taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setTaskJson( + "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false," + + "\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"}," + + "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]"); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(5); + taskExecutionContext.setTenantCode("roo"); + taskExecutionContext.setScheduleTime(new Date()); + taskExecutionContext.setQueue("default"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + + + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + Map definedParams = new HashMap<>(); + definedParams.put("time_gb", "2020-12-16 00:00:00"); + taskExecutionContext.setDefinedParams(definedParams); + PowerMockito.mockStatic(Files.class); + PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true); + commandExecuteResult = new CommandExecuteResult(); + commandExecuteResult.setAppIds("appId"); + commandExecuteResult.setExitStatusCode(0); + commandExecuteResult.setProcessId(1); + } + + @Test + public void testShellReturnString() { + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + try { + PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); + } catch (Exception e) { + e.printStackTrace(); + } + shellTask.setResult("shell return string"); + logger.info("shell return string:{}", shellTask.getResultString()); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index c5f2de82ea..a8e9e70ded 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.task.shell; import static org.mockito.ArgumentMatchers.anyString; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files; import java.nio.file.Paths; +import java.sql.DriverManager; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory; * shell task test. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ShellTask.class}) +@PrepareForTest(value = {ShellTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class}) public class ShellTaskTest { private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); @@ -57,6 +60,7 @@ public class ShellTaskTest { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); + shellCommandExecutor.setResultString("shellReturn"); taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); @@ -68,7 +72,7 @@ public class ShellTaskTest { "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" + "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " - + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":" + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); @@ -82,7 +86,7 @@ public class ShellTaskTest { taskExecutionContext.setTaskParams( "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + - "[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); Map definedParams = new HashMap<>(); definedParams.put("time_gb", "2020-12-16 00:00:00"); @@ -111,4 +115,12 @@ public class ShellTaskTest { PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); shellTask.handle(); } + + @Test + public void testSetResult() { + shellTask = new ShellTask(taskExecutionContext, logger); + shellTask.init(); + String r = "return"; + shellTask.setResult(r); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java index 64db568916..2abb91c6b6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java @@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.sql.Connection; import java.sql.DriverManager; @@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory; * sql task test */ @RunWith(PowerMockRunner.class) -@PrepareForTest(value = {SqlTask.class, DriverManager.class}) +@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class}) public class SqlTaskTest { private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class); @@ -70,7 +72,9 @@ public class SqlTaskTest { props.setTaskStartTime(new Date()); props.setTaskTimeout(0); props.setTaskParams( - "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}"); + "{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}]," + + "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\"," + + "\"sqlType\":1}"); taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); @@ -85,6 +89,8 @@ public class SqlTaskTest { sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao()); alertClientService = PowerMockito.mock(AlertClientService.class); sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); sqlTask.init(); @@ -95,7 +101,7 @@ public class SqlTaskTest { Assert.assertNotNull(sqlTask.getParameters()); } - @Test(expected = Exception.class) + @Test public void testHandle() throws Exception { Connection connection = PowerMockito.mock(Connection.class); PowerMockito.mockStatic(DriverManager.class); diff --git a/dolphinscheduler-ui/.env b/dolphinscheduler-ui/.env index e676be6059..e6b00a11d7 100644 --- a/dolphinscheduler-ui/.env +++ b/dolphinscheduler-ui/.env @@ -14,7 +14,7 @@ # limitations under the License. # back end interface address -API_BASE = http://192.168.xx.xx:12345 +API_BASE = http://127.0.0.1:12345 # If IP access is required for local development, remove the "#" #DEV_HOST = 192.168.xx.xx