diff --git a/docs/docs/en/guide/task/stored-procedure.md b/docs/docs/en/guide/task/stored-procedure.md index 5aff75207c..a1a3942091 100644 --- a/docs/docs/en/guide/task/stored-procedure.md +++ b/docs/docs/en/guide/task/stored-procedure.md @@ -2,27 +2,39 @@ - Execute the stored procedure according to the selected DataSource. -> Drag from the toolbar ![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png) task node into the canvas, as shown in the figure below: +> Drag from the `PROCEDURE` task node into the canvas, as shown in the figure below:

- +

## Task Parameters -| **Parameter** | **Description** | -| ------- | ---------- | -| Node Name | Set the name of the task. Node names within a workflow definition are unique. | -| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. | -| Description | Describes the function of this node. | -| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. | -| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. | -| Task group name | The group in Resources, if not configured, it will not be used. | -| Environment Name | Configure the environment in which to run the script. | -| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. | -| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. | -| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. | -| DataSource | The DataSource type of the stored procedure supports MySQL and POSTGRESQL, select the corresponding DataSource. | -| Method | The method name of the stored procedure. | -| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. | -| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. | \ No newline at end of file +| **Parameter** | **Description** | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Node Name | Set the name of the task. Node names within a workflow definition are unique. | +| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. | +| Description | Describes the function of this node. | +| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. | +| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. | +| Task group name | The group in Resources, if not configured, it will not be used. | +| Environment Name | Configure the environment in which to run the script. | +| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. | +| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. | +| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. | +| DataSource | The DataSource type of the stored procedure supports MySQL, POSTGRESQL, ORACLE. | +| SQL Statement | call a stored procedure, such as `call test(${in1},${out1});`. | +| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. | +| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. | + +## Remark + +- Prepare: Create a stored procedure in the database, such as: + + ``` + CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT) + begin + set out1=in1; + END + ``` + diff --git a/docs/docs/zh/guide/task/stored-procedure.md b/docs/docs/zh/guide/task/stored-procedure.md index 48b3dc946b..b383ae3d10 100644 --- a/docs/docs/zh/guide/task/stored-procedure.md +++ b/docs/docs/zh/guide/task/stored-procedure.md @@ -1,12 +1,23 @@ # 存储过程节点 - 根据选择的数据源,执行存储过程。 -> 拖动工具栏中的![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png)任务节点到画板中,如下图所示: + +> 拖动工具栏中的`PROCEDURE`任务节点到画板中,如下图所示:

-- 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源 -- 方法:是存储过程的方法名称 -- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型 +- 前提:在该数据库里面创建存储过程,如: + +``` +CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT) +begin + set out1=in1; +END +``` + +- 数据源:存储过程的数据源类型支持MySQL、POSTGRESQL、ORACLE,选择对应的数据源 +- SQL Statement:调用存储过程,如 `call test(${in1},${out1});`; +- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型; + diff --git a/docs/img/procedure-en.png b/docs/img/procedure-en.png deleted file mode 100644 index b4679b37ab..0000000000 Binary files a/docs/img/procedure-en.png and /dev/null differ diff --git a/docs/img/procedure_edit.png b/docs/img/procedure_edit.png index 004c0ba1b2..06bcd99d47 100644 Binary files a/docs/img/procedure_edit.png and b/docs/img/procedure_edit.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 0b23905059..a3c26bfaec 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -103,7 +103,6 @@ public abstract class AbstractTask { public abstract void handle() throws TaskException; - /** * cancel application * @@ -130,7 +129,9 @@ public abstract class AbstractTask { /* * analysis log? get submitted yarn application id */ - try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) { + try ( + BufferedReader br = new BufferedReader( + new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) { String line; while ((line = br.readLine()) != null) { String appId = findAppId(line); @@ -244,5 +245,4 @@ public abstract class AbstractTask { } return status; } - -} \ No newline at end of file +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 02e960ebdb..df0fbae71e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.procedure; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; + import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; @@ -41,8 +44,7 @@ import java.sql.Types; import java.util.HashMap; import java.util.Map; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import com.google.common.collect.Maps; /** * procedure task @@ -73,14 +75,16 @@ public class ProcedureTask extends AbstractTaskExecutor { logger.info("procedure task params {}", taskExecutionContext.getTaskParams()); - this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class); + this.procedureParameters = + JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class); // check parameters if (!procedureParameters.checkParameters()) { throw new RuntimeException("procedure task params is not valid"); } - procedureTaskExecutionContext = procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + procedureTaskExecutionContext = + procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); } @Override @@ -97,13 +101,22 @@ public class ProcedureTask extends AbstractTaskExecutor { // load class DbType dbType = DbType.valueOf(procedureParameters.getType()); // get datasource - ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()), - procedureTaskExecutionContext.getConnectionParams()); + ConnectionParam connectionParam = + DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()), + procedureTaskExecutionContext.getConnectionParams()); // get jdbc connection connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); + Map sqlParamsMap = new HashMap<>(); - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + Map paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap() + : taskExecutionContext.getPrepareParamsMap(); + if (procedureParameters.getOutProperty() != null) { + // set out params before format sql + paramsMap.putAll(procedureParameters.getOutProperty()); + } + + // format sql String proceduerSql = formatSql(sqlParamsMap, paramsMap); // call method stmt = connection.prepareCall(proceduerSql); @@ -131,7 +144,8 @@ public class ProcedureTask extends AbstractTaskExecutor { private String formatSql(Map sqlParamsMap, Map paramsMap) { // combining local and global parameters - setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId()); + setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, + taskExecutionContext.getTaskInstanceId()); return procedureParameters.getMethod().replaceAll(rgex, "?"); } @@ -162,8 +176,8 @@ public class ProcedureTask extends AbstractTaskExecutor { * @return outParameterMap * @throws Exception Exception */ - private Map getOutParameterMap(CallableStatement stmt, Map paramsMap - , Map totalParamsMap) throws Exception { + private Map getOutParameterMap(CallableStatement stmt, Map paramsMap, + Map totalParamsMap) throws Exception { Map outParameterMap = new HashMap<>(); if (procedureParameters.getLocalParametersMap() == null) { return outParameterMap; @@ -174,7 +188,8 @@ public class ProcedureTask extends AbstractTaskExecutor { for (Map.Entry entry : paramsMap.entrySet()) { Property property = entry.getValue(); if (property.getDirect().equals(Direct.IN)) { - ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue()); + ParameterUtils.setInParameter(index, stmt, property.getType(), + totalParamsMap.get(property.getProp()).getValue()); } else if (property.getDirect().equals(Direct.OUT)) { setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue()); outParameterMap.put(index, property); @@ -231,7 +246,8 @@ public class ProcedureTask extends AbstractTaskExecutor { * @param dataType dataType * @throws SQLException SQLException */ - private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { + private Object getOutputParameter(CallableStatement stmt, int index, String prop, + DataType dataType) throws SQLException { Object value = null; switch (dataType) { case VARCHAR: