Browse Source

[Feature-#3805][server-worker]global params of worker (#4679)

* return shell output to master node

* return sql output to master node

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug
pull/3/MERGE
fanwq 3 years ago committed by GitHub
parent
commit
559efac58d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/resources/datasource.properties
  2. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  3. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  4. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  5. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  6. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  7. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  8. 75
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  9. 115
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java
  10. 18
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  11. 12
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
  12. 2
      dolphinscheduler-ui/.env

2
dolphinscheduler-dao/src/main/resources/datasource.properties

@ -66,4 +66,4 @@ spring.datasource.password=test
# open PSCache, specify count PSCache for every connection # open PSCache, specify count PSCache for every connection
#spring.datasource.poolPreparedStatements=true #spring.datasource.poolPreparedStatements=true
#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 #spring.datasource.maxPoolPreparedStatementPerConnectionSize=20

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -174,6 +174,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId()); responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
responseCommand.setVarPool(task.getVarPool()); responseCommand.setVarPool(task.getVarPool());
responseCommand.setResult(task.getResultString());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) { } catch (Exception e) {
logger.error("task scheduler failure", e); logger.error("task scheduler failure", e);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
/** /**
* abstract command executor * abstract command executor
*/ */
@ -84,6 +85,11 @@ public abstract class AbstractCommandExecutor {
*/ */
protected final List<String> logBuffer; protected final List<String> logBuffer;
/**
* SHELL result string
*/
protected String resultString;
/** /**
* taskExecutionContext * taskExecutionContext
*/ */
@ -223,6 +229,14 @@ public abstract class AbstractCommandExecutor {
return varPool.toString(); return varPool.toString();
} }
public String getResultString() {
return resultString;
}
public void setResultString(String result) {
this.resultString = result;
}
/** /**
* cancel application * cancel application
* *
@ -355,6 +369,7 @@ public abstract class AbstractCommandExecutor {
varPool.append("$VarPool$"); varPool.append("$VarPool$");
} else { } else {
logBuffer.add(line); logBuffer.add(line);
resultString = line;
lastFlushTime = flush(lastFlushTime); lastFlushTime = flush(lastFlushTime);
} }
} }

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -63,6 +63,11 @@ public abstract class AbstractTask {
*/ */
protected int processId; protected int processId;
/**
* SHELL result string
*/
protected String resultString;
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
*/ */
@ -167,6 +172,14 @@ public abstract class AbstractTask {
this.processId = processId; this.processId = processId;
} }
public String getResultString() {
return resultString;
}
public void setResultString(String resultString) {
this.resultString = resultString;
}
/** /**
* get task parameters * get task parameters
* *

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -21,6 +21,7 @@ import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -34,6 +35,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;
import java.io.File; import java.io.File;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -41,13 +44,13 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger;
/** /**
* shell task * shell task
*/ */
@ -102,6 +105,7 @@ public class ShellTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
setResult(shellCommandExecutor.getResultString());
} catch (Exception e) { } catch (Exception e) {
logger.error("shell task error", e); logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
@ -183,4 +187,17 @@ public class ShellTask extends AbstractTask {
} }
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
} }
public void setResult(String result) {
Map<String, Property> localParams = shellParameters.getLocalParametersMap();
List<Map<String, String>> outProperties = new ArrayList<>();
Map<String, String> p = new HashMap<>();
localParams.forEach((k,v) -> {
if (v.getDirect() == Direct.OUT) {
p.put(k, result);
}
});
outProperties.add(p);
resultString = JSONUtils.toJsonString(outProperties);
}
} }

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -148,7 +149,7 @@ public class SqlTask extends AbstractTask {
logger); logger);
// execute sql task // execute sql task
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams());
setExitStatusCode(Constants.EXIT_CODE_SUCCESS); setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
@ -237,7 +238,8 @@ public class SqlTask extends AbstractTask {
public void executeFuncAndSql(SqlBinds mainSqlBinds, public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds, List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds, List<SqlBinds> postStatementsBinds,
List<String> createFuncs) { List<String> createFuncs,
List<Property> properties) {
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet resultSet = null; ResultSet resultSet = null;
@ -253,18 +255,21 @@ public class SqlTask extends AbstractTask {
preSql(connection, preStatementsBinds); preSql(connection, preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds); stmt = prepareStatementAndBind(connection, mainSqlBinds);
String result = null;
// decide whether to executeQuery or executeUpdate based on sqlType // decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send // query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery(); resultSet = stmt.executeQuery();
resultProcess(resultSet); result = resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement // non query statement
stmt.executeUpdate(); String updateResult = String.valueOf(stmt.executeUpdate());
result = setNonQuerySqlReturn(updateResult, properties);
} }
postSql(connection, postStatementsBinds); postSql(connection, postStatementsBinds);
this.setResultString(result);
} catch (Exception e) { } catch (Exception e) {
logger.error("execute sql error", e); logger.error("execute sql error", e);
@ -274,13 +279,28 @@ public class SqlTask extends AbstractTask {
} }
} }
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null;
for (Property info :properties) {
if (Direct.OUT == info.getDirect()) {
List<Map<String,String>> updateRL = new ArrayList<>();
Map<String,String> updateRM = new HashMap<>();
updateRM.put(info.getProp(),updateResult);
updateRL.add(updateRM);
result = JSONUtils.toJsonString(updateRL);
break;
}
}
return result;
}
/** /**
* result process * result process
* *
* @param resultSet resultSet * @param resultSet resultSet
* @throws Exception Exception * @throws Exception Exception
*/ */
private void resultProcess(ResultSet resultSet) throws Exception { private String resultProcess(ResultSet resultSet) throws Exception {
ArrayNode resultJSONArray = JSONUtils.createArrayNode(); ArrayNode resultJSONArray = JSONUtils.createArrayNode();
ResultSetMetaData md = resultSet.getMetaData(); ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount(); int num = md.getColumnCount();
@ -300,6 +320,7 @@ public class SqlTask extends AbstractTask {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
JSONUtils.toJsonString(resultJSONArray)); JSONUtils.toJsonString(resultJSONArray));
return result;
} }
/** /**

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -119,6 +119,11 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
String result = responseCommand.getResult();
responseCommand.setResult("return string");
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Stopper.stop(); Stopper.stop();
nettyRemotingServer.close(); nettyRemotingServer.close();

75
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -17,13 +17,22 @@
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -116,4 +125,70 @@ public class TaskManagerTest {
taskExecutionContext.setTaskType("XXX"); taskExecutionContext.setTaskType("XXX");
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
} }
@Test
public void testShellTaskReturnString() {
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setTaskJson(
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(5);
taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
shellTask.setResultString("shell return");
String shellReturn = shellTask.getResultString();
shellTask.init();
shellTask.setResult(shellReturn);
Assert.assertSame(shellReturn, "shell return");
}
@Test
public void testSqlTaskReturnString() {
String params = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+ "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskParams("{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
+ "\"sqlType\":1}");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setTaskAppId("1");
taskExecutionContext.setTenantCode("root");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(10000);
taskExecutionContext.setLogPath("/tmp/dx");
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setConnectionParams(params);
taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
SqlTask sqlTask = new SqlTask(taskExecutionContext, logger, null);
SqlParameters sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
List<Property> properties = sqlParameters.getLocalParams();
sqlTask.setNonQuerySqlReturn("sql return", properties);
}
} }

115
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.shell;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* shell task return test.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ShellTask.class})
public class ShellTaskReturnTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
private ShellTask shellTask;
private ShellCommandExecutor shellCommandExecutor;
private TaskExecutionContext taskExecutionContext;
private CommandExecuteResult commandExecuteResult;
@Before
public void before() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setTaskJson(
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,"
+ "\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(5);
taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
PowerMockito.mockStatic(Files.class);
PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true);
commandExecuteResult = new CommandExecuteResult();
commandExecuteResult.setAppIds("appId");
commandExecuteResult.setExitStatusCode(0);
commandExecuteResult.setProcessId(1);
}
@Test
public void testShellReturnString() {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
try {
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
} catch (Exception e) {
e.printStackTrace();
}
shellTask.setResult("shell return string");
logger.info("shell return string:{}", shellTask.getResultString());
}
}

18
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.task.shell;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.sql.DriverManager;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
* shell task test. * shell task test.
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ShellTask.class}) @PrepareForTest(value = {ShellTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
public class ShellTaskTest { public class ShellTaskTest {
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class); private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
@ -57,6 +60,7 @@ public class ShellTaskTest {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
shellCommandExecutor.setResultString("shellReturn");
taskExecutionContext = new TaskExecutionContext(); taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test"); taskExecutionContext.setTaskName("kris test");
@ -68,7 +72,7 @@ public class ShellTaskTest {
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"" "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
+ +
"tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? " "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":" + "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}" + "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":" + "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
@ -82,7 +86,7 @@ public class ShellTaskTest {
taskExecutionContext.setTaskParams( taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
+ +
"[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR" "[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}"); + "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
Map<String, String> definedParams = new HashMap<>(); Map<String, String> definedParams = new HashMap<>();
definedParams.put("time_gb", "2020-12-16 00:00:00"); definedParams.put("time_gb", "2020-12-16 00:00:00");
@ -111,4 +115,12 @@ public class ShellTaskTest {
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
shellTask.handle(); shellTask.handle();
} }
@Test
public void testSetResult() {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
String r = "return";
shellTask.setResult(r);
}
} }

12
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java

@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
* sql task test * sql task test
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest(value = {SqlTask.class, DriverManager.class}) @PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
public class SqlTaskTest { public class SqlTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class); private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
@ -70,7 +72,9 @@ public class SqlTaskTest {
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
props.setTaskParams( props.setTaskParams(
"{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}"); "{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
+ "\"sqlType\":1}");
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
@ -85,6 +89,8 @@ public class SqlTaskTest {
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext); PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao());
alertClientService = PowerMockito.mock(AlertClientService.class); alertClientService = PowerMockito.mock(AlertClientService.class);
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
sqlTask.init(); sqlTask.init();
@ -95,7 +101,7 @@ public class SqlTaskTest {
Assert.assertNotNull(sqlTask.getParameters()); Assert.assertNotNull(sqlTask.getParameters());
} }
@Test(expected = Exception.class) @Test
public void testHandle() throws Exception { public void testHandle() throws Exception {
Connection connection = PowerMockito.mock(Connection.class); Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.mockStatic(DriverManager.class); PowerMockito.mockStatic(DriverManager.class);

2
dolphinscheduler-ui/.env

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
# back end interface address # back end interface address
API_BASE = http://192.168.xx.xx:12345 API_BASE = http://127.0.0.1:12345
# If IP access is required for local development, remove the "#" # If IP access is required for local development, remove the "#"
#DEV_HOST = 192.168.xx.xx #DEV_HOST = 192.168.xx.xx

Loading…
Cancel
Save