Browse Source

[Improvement][Task Plugin] Support Task Plugin output response as a parameter (#13529)

* support http output response as param

* add UT

* fix sqoop UT
3.2.0-release
JieguangZhou 1 year ago committed by GitHub
parent
commit
16b193454b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      docs/docs/en/guide/task/http.md
  2. 10
      docs/docs/zh/guide/task/http.md
  3. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
  4. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
  5. 19
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java
  6. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/test/java/org/apache/dolphinscheduler/plugin/task/sqoop/EntityTestUtils.java

16
docs/docs/en/guide/task/http.md

@ -17,12 +17,24 @@ This node is used to perform http type tasks such as the common POST and GET req
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
| **Parameter** | **Description** |
|----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|---|--------------------|---|---|-------------------------|-----------------------------------------------------------------------------------------------|
|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Request address | HTTP request URL. |
| Request type | Supports GET, POSt, HEAD, PUT, DELETE. || Request parameters | Supports Parameter, Body, Headers. || Verification conditions | Supports default response code, custom response code, content included, content not included. |
| Request type | Supports GET, POST, HEAD, PUT, DELETE. |
| Request parameters | Supports Parameter, Body, Headers. |
| Verification conditions | Supports default response code, custom response code, content included, content not included. |
| Verification content | When the verification condition selects a custom response code, the content contains, and the content does not contain, the verification content is required. |
| Custom parameter | It is a user-defined parameter of http part, which will replace the content with `${variable}` in the script. |
## Task Output Parameters
| **Task Parameter** | **Description** |
|--------------------|-------------------------------------|
| response | VARCHAR, http request return result |
Can use `${taskName.response}` to reference task output parameters in downstream tasks.
For example, if the current task1 is a http task, the downstream task can use `${task1.response}` to reference the output parameters of task1.
## Example
HTTP defines the different methods of interacting with the server, the most basic methods are GET, POST, PUT and DELETE. Here we use the http task node to demonstrate the use of POST to send a request to the system's login page to submit data.

10
docs/docs/zh/guide/task/http.md

@ -26,6 +26,16 @@
| 校验内容 | 当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容 |
| 自定义参数 | 是 http 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |
## 任务输出参数
| **任务参数** | **描述** |
|----------|---------------------|
| response | VARCHAR, http请求返回结果 |
可以在下游任务中使用 ${taskName.response} 引用任务输出参数。
如,当前task1为http任务, 下游任务可以使用 `${task1.response}` 引用task1的输出参数
## 任务样例
HTTP 定义了与服务器交互的不同方法,最基本的方法有4种,分别是GET,POST,PUT,DELETE。这里我们使用 http 任务节点,演示使用 POST 向系统的登录页面发送请求,提交数据。

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

@ -199,7 +199,7 @@ public abstract class AbstractParameters implements IParameters {
return new ResourceParametersHelper();
}
private void addPropertyToValPool(Property property) {
public void addPropertyToValPool(Property property) {
varPool.removeIf(p -> p.getProp().equals(property.getProp()));
varPool.add(property);
}

13
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
@ -101,6 +103,7 @@ public class HttpTask extends AbstractTask {
statusCode = String.valueOf(getStatusCode(response));
body = getResponseBody(response);
exitStatusCode = validResponse(body, statusCode);
addDefaultOutput(body);
long costTime = System.currentTimeMillis() - startTime;
log.info(
"startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
@ -327,4 +330,14 @@ public class HttpTask extends AbstractTask {
public AbstractParameters getParameters() {
return this.httpParameters;
}
public void addDefaultOutput(String response) {
// put response in output
Property outputProperty = new Property();
outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), "response"));
outputProperty.setDirect(Direct.OUT);
outputProperty.setType(DataType.VARCHAR);
outputProperty.setValue(response);
httpParameters.addPropertyToValPool(outputProperty);
}
}

19
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java

@ -21,7 +21,10 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
@ -174,6 +177,22 @@ public class HttpTaskTest {
Assertions.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
}
@Test
public void testAddDefaultOutput() throws Exception {
HttpTask httpTask = generateHttpTask(HttpMethod.GET, HttpStatus.SC_OK);
AbstractParameters httpParameters = httpTask.getParameters();
String response = "{\"status\": \"success\"}";
httpTask.addDefaultOutput(response);
List<Property> varPool = httpParameters.getVarPool();
Assertions.assertEquals(1, varPool.size());
Property property = varPool.get(0);
Assertions.assertEquals("null.response", property.getProp());
Assertions.assertEquals(Direct.OUT, property.getDirect());
Assertions.assertEquals(DataType.VARCHAR, property.getType());
Assertions.assertEquals(response, property.getValue());
}
private String withMockWebServer(String path, int actualResponseCode,
String actualResponseBody) throws IOException {
MockWebServer server = new MockWebServer();

5
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/test/java/org/apache/dolphinscheduler/plugin/task/sqoop/EntityTestUtils.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.sqoop;
import org.apache.commons.lang3.ObjectUtils;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@ -75,6 +77,9 @@ public class EntityTestUtils {
for (int i = 0; i < parameterTypes.length; i++) {
objects[i] = OBJECT_MAP.get(parameterTypes[i].getName());
}
if (ObjectUtils.allNull(objects)) {
break;
}
method.invoke(tempInstance, objects);
} else {
method.invoke(tempInstance);

Loading…
Cancel
Save