Browse Source

cherry-pick [Bug] Fix procedure task param pass #11919 (#11926)

3.1.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
8d1c2d3eeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      docs/docs/en/guide/task/stored-procedure.md
  2. 19
      docs/docs/zh/guide/task/stored-procedure.md
  3. BIN
      docs/img/procedure-en.png
  4. BIN
      docs/img/procedure_edit.png
  5. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
  6. 40
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

48
docs/docs/en/guide/task/stored-procedure.md

@ -2,27 +2,39 @@
- Execute the stored procedure according to the selected DataSource.
> Drag from the toolbar ![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png) task node into the canvas, as shown in the figure below:
> Drag from the `PROCEDURE` task node into the canvas, as shown in the figure below:
<p align="center">
<img src="../../../../img/procedure-en.png" width="80%" />
<img src="../../../../img/procedure_edit.png" width="80%" />
</p>
## Task Parameters
| **Parameter** | **Description** |
| ------- | ---------- |
| Node Name | Set the name of the task. Node names within a workflow definition are unique. |
| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. |
| Description | Describes the function of this node. |
| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. |
| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. |
| Task group name | The group in Resources, if not configured, it will not be used. |
| Environment Name | Configure the environment in which to run the script. |
| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. |
| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. |
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| DataSource | The DataSource type of the stored procedure supports MySQL and POSTGRESQL, select the corresponding DataSource. |
| Method | The method name of the stored procedure. |
| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. |
| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. |
| **Parameter** | **Description** |
|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Node Name | Set the name of the task. Node names within a workflow definition are unique. |
| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. |
| Description | Describes the function of this node. |
| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. |
| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. |
| Task group name | The group in Resources, if not configured, it will not be used. |
| Environment Name | Configure the environment in which to run the script. |
| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. |
| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. |
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| DataSource | The DataSource type of the stored procedure supports MySQL, POSTGRESQL, ORACLE. |
| SQL Statement | call a stored procedure, such as `call test(${in1},${out1});`. |
| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. |
| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. |
## Remark
- Prepare: Create a stored procedure in the database, such as:
```
CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
begin
set out1=in1;
END
```

19
docs/docs/zh/guide/task/stored-procedure.md

@ -1,12 +1,23 @@
# 存储过程节点
- 根据选择的数据源,执行存储过程。
> 拖动工具栏中的![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png)任务节点到画板中,如下图所示:
> 拖动工具栏中的`PROCEDURE`任务节点到画板中,如下图所示:
<p align="center">
<img src="../../../../img/procedure_edit.png" width="80%" />
</p>
- 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
- 方法:是存储过程的方法名称
- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型
- 前提:在该数据库里面创建存储过程,如:
```
CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
begin
set out1=in1;
END
```
- 数据源:存储过程的数据源类型支持MySQL、POSTGRESQL、ORACLE,选择对应的数据源
- SQL Statement:调用存储过程,如 `call test(${in1},${out1});`
- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型;

BIN
docs/img/procedure-en.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 162 KiB

BIN
docs/img/procedure_edit.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 113 KiB

8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java

@ -103,7 +103,6 @@ public abstract class AbstractTask {
public abstract void handle() throws TaskException;
/**
* cancel application
*
@ -130,7 +129,9 @@ public abstract class AbstractTask {
/*
* analysis log? get submitted yarn application id
*/
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
try (
BufferedReader br = new BufferedReader(
new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
String appId = findAppId(line);
@ -244,5 +245,4 @@ public abstract class AbstractTask {
}
return status;
}
}
}

40
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
@ -41,8 +44,7 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import com.google.common.collect.Maps;
/**
* procedure task
@ -73,14 +75,16 @@ public class ProcedureTask extends AbstractTaskExecutor {
logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
this.procedureParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
}
procedureTaskExecutionContext = procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
procedureTaskExecutionContext =
procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
@Override
@ -97,13 +101,22 @@ public class ProcedureTask extends AbstractTaskExecutor {
// load class
DbType dbType = DbType.valueOf(procedureParameters.getType());
// get datasource
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
procedureTaskExecutionContext.getConnectionParams());
ConnectionParam connectionParam =
DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
procedureTaskExecutionContext.getConnectionParams());
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
Map<Integer, Property> sqlParamsMap = new HashMap<>();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
: taskExecutionContext.getPrepareParamsMap();
if (procedureParameters.getOutProperty() != null) {
// set out params before format sql
paramsMap.putAll(procedureParameters.getOutProperty());
}
// format sql
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
stmt = connection.prepareCall(proceduerSql);
@ -131,7 +144,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
// combining local and global parameters
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap,
taskExecutionContext.getTaskInstanceId());
return procedureParameters.getMethod().replaceAll(rgex, "?");
}
@ -162,8 +176,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @return outParameterMap
* @throws Exception Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
, Map<String, Property> totalParamsMap) throws Exception {
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap,
Map<String, Property> totalParamsMap) throws Exception {
Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
@ -174,7 +188,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
Property property = entry.getValue();
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
ParameterUtils.setInParameter(index, stmt, property.getType(),
totalParamsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
outParameterMap.put(index, property);
@ -231,7 +246,8 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @param dataType dataType
* @throws SQLException SQLException
*/
private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
private Object getOutputParameter(CallableStatement stmt, int index, String prop,
DataType dataType) throws SQLException {
Object value = null;
switch (dataType) {
case VARCHAR:

Loading…
Cancel
Save