Browse Source

[DSIP-72][Dynamic Task] Remove dynamic task type (#16842)

* remove dynamic task type

* remove dynamic task type

* remove dynamic task type

* resolve conflicts

* resolve conflicts

* resolve conflicts

* fix comment

* fix ci
dev
xiangzihao 1 month ago committed by GitHub
parent
commit
f56d51cc28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      docs/configs/docsdev.js
  2. 77
      docs/docs/en/guide/task/dynamic.md
  3. 1
      docs/docs/en/guide/upgrade/incompatible.md
  4. 76
      docs/docs/zh/guide/task/dynamic.md
  5. 5
      docs/docs/zh/guide/upgrade/incompatible.md
  6. BIN
      docs/img/tasks/demo/dynamic_definition.png
  7. BIN
      docs/img/tasks/demo/dynamic_running.png
  8. BIN
      docs/img/tasks/icons/dynamic.png
  9. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  10. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java
  11. 1
      dolphinscheduler-api/src/main/resources/task-type-config.yaml
  12. 12
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java
  13. 21
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
  14. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
  15. 88
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ListenerEvent.java
  16. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql
  17. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql
  18. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java
  19. 173
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java
  20. 86
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicCommandUtils.java
  21. 339
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTask.java
  22. 74
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTaskPluginFactory.java
  23. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicOutput.java
  24. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java
  25. 117
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowWaitToRunStateAction.java
  26. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
  27. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
  28. 30
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannel.java
  29. 38
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannelFactory.java
  30. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java
  31. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java
  32. BIN
      dolphinscheduler-ui/public/images/task-icons/dynamic.png
  33. BIN
      dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
  34. 3
      dolphinscheduler-ui/src/locales/en_US/project.ts
  35. 3
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  36. 3
      dolphinscheduler-ui/src/store/project/task-type.ts
  37. 1
      dolphinscheduler-ui/src/store/project/types.ts
  38. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
  39. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  40. 120
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
  41. 10
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  42. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  43. 83
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
  44. 4
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  45. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

8
docs/configs/docsdev.js

@ -101,10 +101,6 @@ export default {
title: 'SubWorkflow', title: 'SubWorkflow',
link: '/en-us/docs/dev/user_doc/guide/task/sub-workflow.html', link: '/en-us/docs/dev/user_doc/guide/task/sub-workflow.html',
}, },
{
title: 'Dynamic',
link: '/en-us/docs/dev/user_doc/guide/task/dynamic.html',
},
{ {
title: 'Dependent', title: 'Dependent',
link: '/en-us/docs/dev/user_doc/guide/task/dependent.html', link: '/en-us/docs/dev/user_doc/guide/task/dependent.html',
@ -816,10 +812,6 @@ export default {
title: 'SubWorkflow', title: 'SubWorkflow',
link: '/zh-cn/docs/dev/user_doc/guide/task/sub-workflow.html', link: '/zh-cn/docs/dev/user_doc/guide/task/sub-workflow.html',
}, },
{
title: 'Dynamic',
link: '/zh-cn/docs/dev/user_doc/guide/task/dynamic.html',
},
{ {
title: 'Dependent', title: 'Dependent',
link: '/zh-cn/docs/dev/user_doc/guide/task/dependent.html', link: '/zh-cn/docs/dev/user_doc/guide/task/dependent.html',

77
docs/docs/en/guide/task/dynamic.md

@ -1,77 +0,0 @@
# Dynamic Task
## Overview
Dynamic task can input multiple parameter lists, calculate all parameter combinations through Cartesian product, and then execute each parameter combination as a sub-workflow node.
For example, we have a workflow with two input parameters, a, b.
We can use the dynamic node to define this workflow definition as a node, and then enter the parameter list
- Parameter a: a1, a2
- Parameter b: b1, b2
Then the dynamic node will calculate four parameter combinations, which are
- a1, b1
- a1, b2
- a2, b1
- a2, b2
Then execute these four parameter combinations as the startup parameters of the sub-workflow node, and a total of four sub-workflow nodes are generated.
## Create Task
- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
DAG editing page.
- Drag from the toolbar <img src="../../../../img/tasks/icons/dynamic.png" width="15"/> task node to canvas.
The task definition is shown in the following figure:
![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
## Task Parameters
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix]&#40;appendix.md#default-task-parameters&#41; `Default Task Parameters` section for default parameters.)
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
| **Task Parameters** | **Description** |
|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Child Node | Select the workflow definition of the sub-workflow. You can jump to the workflow definition of the selected sub-workflow by entering the sub-node in the upper right corner. |
| max num of sub workflow instances | The maximum number of sub-workflow instances dynamically generated. After exceeding this upper limit, the dynamically generated sub-workflow instances will no longer be executed. |
| Parallelism | The parallelism of the sub-workflow instances dynamically generated, that is, the number of sub-workflow instances executed at the same time. |
| Param Value | The parameter of the sub-workflow instance dynamically generated, supports multiple parameters, and the parameters are separated by delimiters. |
| Filter Condition | The filter condition of the sub-workflow instance dynamically generated, supports multiple filter values, and the filter conditions are separated by commas, such as `2022,2023`, which will filter the parameter groups containing the values of 2022 and 2023. |
## Task Parameters Output
The output parameters of the dynamic node refer to the output parameters of the sub-workflow. The output parameters of all sub-workflows will be collected into a list as the output parameters of the dynamic node.
When the downstream task is referenced, it can be referenced by `${dynamic.out(TaskName)}`.
The value is a json, as shown below
```Json
[
{ "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
{ "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
{ "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
]
```
- `dynParams` the input parameters of the sub-workflow
- `outputValue` is the output parameter of the sub-workflow. For example, the `p` here is a string that splices the output parameters `a` and `b` of the sub-workflow and outputs them in the form of variables `p`
- `mappedTimes` is the index of the execution of the sub-workflow, starting from 1
## Running Status
After the dynamic task is started, all parameter combinations will be calculated according to the input parameter list, and then a sub-workflow instance will be created for each parameter combination.
When the dynamic task is running, it will periodically check the statistical information of all current sub-workflow instances. If the parallelism is greater than the number of sub-workflow instances running, it will trigger the start of the appropriate number of sub-workflow instances (the sub-workflow instances are created first, and then the start is triggered later).
As shown below.
![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
The dynamic task will run successfully only when all sub-workflow instances are running successfully.

1
docs/docs/en/guide/upgrade/incompatible.md

@ -36,4 +36,5 @@ This document records the incompatible updates between each version. You need to
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) * Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790) * Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* Drop unused column `other_params_json` in `t_ds_worker_group` ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860) * Drop unused column `other_params_json` in `t_ds_worker_group` ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
* Remove the `Dynamic` from the `Task Plugin` ([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)

76
docs/docs/zh/guide/task/dynamic.md

@ -1,76 +0,0 @@
# 动态节点
## 综述
动态节点可以通过输入多个参数列表,通过笛卡尔积计算出多所有的参数组合,然后将每个参数组合作为一个子工作流节点执行。
比如我们有一个工作流,它具有两个输入参数,a, b。
我们可以通过动态节点,将这个工作流定义当做一个节点,然后输入参数列表
- 参数a:a1, a2
- 参数b:b1, b2
那么动态节点会计算出四个参数组合,分别是
- a1, b1
- a1, b2
- a2, b1
- a2, b2
然后将这四个参数组合作为子工作流节点的启动参数执行,共生成四个子工作流节点。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击”创建工作流”按钮,进入 DAG 编辑页面:
- 拖动工具栏的 <img src="../../../../img/tasks/icons/dynamic.png" width="15"/> 任务节点到画板中。
任务定义如下图所示:
![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
## 任务参数
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录]&#40;appendix.md#默认任务参数&#41;`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
| **任务参数** | **描述** |
|----------|------------------------------------------------------------------------------|
| 子节点 | 是选择子工作流的工作流定义,右上角进入该子节点可以跳转到所选子工作流的工作流定义 |
| 动态生成实例上限 | 是指动态生成的子工作流实例的上限,超过该上限后,动态生成的子工作流实例将不再执行 |
| 并行度 | 是指动态生成的子工作流实例的并行度,即同时执行的子工作流实例的数量 |
| 取值参数 | 是指动态生成的子工作流实例的参数,支持多个参数,参数之间用分隔符分隔 |
| 过滤条件 | 是指动态生成的子工作流实例的过滤条件,支持多个过滤值,过滤条件之间用逗号分隔, 如 `2022,2023`, 则会过来包含2022和2023的值的参数组 |
## 任务参数输出
动态节点的输出参数,是指子工作流的输出参数,所有子工作流的输出参数都会被收集到一个列表中,作为动态节点的输出参数。
下游任务引用的时候,可以通过 `${dynamic.out(TaskName)}` 的方式引用。
值为一个json,样例如下
```Json
[
{ "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
{ "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
{ "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
]
```
其中
- `dynParams` 是子工作流的输入参数
- `outputValue` 是子工作流的输出参数, 如这里的`p`为将子工作流的输出参数`a`和`b`拼接起来的字符串,以变量`p`的形式输出
- `mappedTimes` 是子工作流的执行的index,从1开始
## 运行状态
Dynamic任务启动后,会根据输入参数列表,计算出所有的参数组合,然后对每一个参数组合,创建一个子工作流实例。
Dynamic运行时,会定期检测当前所有子工作流实例的统计信息,如果并行度大于运行中的子工作流实例的数量,则会触发启动合适数量的工作流实例(工作流实例是先创建,后续再出发启动)。
如下如所示。
![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
当且进度所有的子工作流实例全部运行成功时,dynamic任务才会运行成功。

5
docs/docs/zh/guide/upgrade/incompatible.md

@ -32,6 +32,7 @@
* 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543) * 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) * 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) * 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* 在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790) * 在 `worker``application.yaml` 中移除 `exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* 在`t_ds_worker_group` 表中移除 无用的`other_params_json`字段 ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860) * 在 `t_ds_worker_group` 表中移除 无用的 `other_params_json` 字段 ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
* 从 `任务插件` 中移除 `Dynamic` 类型 ([#16482])(https://github.com/apache/dolphinscheduler/pull/16842)

BIN
docs/img/tasks/demo/dynamic_definition.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

BIN
docs/img/tasks/demo/dynamic_running.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

BIN
docs/img/tasks/icons/dynamic.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -274,8 +274,6 @@ public enum Status {
NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"), NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"),
STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"), STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"),
TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic", "任务实例[{0}]不是Dynamic类型"),
CREATE_PROJECT_PARAMETER_ERROR(10214, "create project parameter error", "创建项目参数错误"), CREATE_PROJECT_PARAMETER_ERROR(10214, "create project parameter error", "创建项目参数错误"),
UPDATE_PROJECT_PARAMETER_ERROR(10215, "update project parameter error", "更新项目参数错误"), UPDATE_PROJECT_PARAMETER_ERROR(10215, "update project parameter error", "更新项目参数错误"),

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java

@ -484,10 +484,6 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId); throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
} }
if (!TaskTypeUtils.isDynamicTask(taskInstance.getTaskType())) {
putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, taskInstance.getName());
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
}
List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper
.queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(), .queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(),
taskInstance.getTaskCode()); taskInstance.getTaskCode());

1
dolphinscheduler-api/src/main/resources/task-type-config.yaml

@ -41,7 +41,6 @@ task:
- 'DEPENDENT' - 'DEPENDENT'
- 'CONDITIONS' - 'CONDITIONS'
- 'SWITCH' - 'SWITCH'
- 'DYNAMIC'
dataIntegration: dataIntegration:
- 'SEATUNNEL' - 'SEATUNNEL'
- 'DATAX' - 'DATAX'

12
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecuteFunctionTest.java

@ -50,8 +50,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
"RUNNING_EXECUTION", "RUNNING_EXECUTION",
"READY_PAUSE", "READY_PAUSE",
"READY_STOP", "READY_STOP",
"SERIAL_WAIT", "SERIAL_WAIT"})
"WAIT_TO_RUN"})
void exceptionIfWorkflowInstanceCannotStop_canStop(WorkflowExecutionStatus workflowExecutionStatus) { void exceptionIfWorkflowInstanceCannotStop_canStop(WorkflowExecutionStatus workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance(); WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1"); workflowInstance.setName("Workflow-1");
@ -65,8 +64,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
"RUNNING_EXECUTION", "RUNNING_EXECUTION",
"READY_PAUSE", "READY_PAUSE",
"READY_STOP", "READY_STOP",
"SERIAL_WAIT", "SERIAL_WAIT"}, mode = EnumSource.Mode.EXCLUDE)
"WAIT_TO_RUN"}, mode = EnumSource.Mode.EXCLUDE)
void exceptionIfWorkflowInstanceCannotStop_canNotStop(WorkflowExecutionStatus workflowExecutionStatus) { void exceptionIfWorkflowInstanceCannotStop_canNotStop(WorkflowExecutionStatus workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance(); WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1"); workflowInstance.setName("Workflow-1");
@ -81,8 +79,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = WorkflowExecutionStatus.class, names = { @EnumSource(value = WorkflowExecutionStatus.class, names = {
"SERIAL_WAIT", "SERIAL_WAIT"})
"WAIT_TO_RUN"})
void ifWorkflowInstanceCanDirectStopInDB_canDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) { void ifWorkflowInstanceCanDirectStopInDB_canDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance(); WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1"); workflowInstance.setName("Workflow-1");
@ -93,8 +90,7 @@ class StopWorkflowInstanceExecuteFunctionTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = WorkflowExecutionStatus.class, names = { @EnumSource(value = WorkflowExecutionStatus.class, names = {
"SERIAL_WAIT", "SERIAL_WAIT"}, mode = EnumSource.Mode.EXCLUDE)
"WAIT_TO_RUN"}, mode = EnumSource.Mode.EXCLUDE)
void ifWorkflowInstanceCanDirectStopInDB_canNotDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) { void ifWorkflowInstanceCanDirectStopInDB_canNotDirectStopInDB(WorkflowExecutionStatus workflowExecutionStatus) {
WorkflowInstance workflowInstance = new WorkflowInstance(); WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setName("Workflow-1"); workflowInstance.setName("Workflow-1");

21
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java

@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import com.baomidou.mybatisplus.annotation.EnumValue; import com.baomidou.mybatisplus.annotation.EnumValue;
@Getter
public enum WorkflowExecutionStatus { public enum WorkflowExecutionStatus {
SUBMITTED_SUCCESS(0, "submitted"), SUBMITTED_SUCCESS(0, "submitted"),
@ -35,7 +37,6 @@ public enum WorkflowExecutionStatus {
FAILURE(6, "failure"), FAILURE(6, "failure"),
SUCCESS(7, "success"), SUCCESS(7, "success"),
SERIAL_WAIT(14, "serial wait"), SERIAL_WAIT(14, "serial wait"),
WAIT_TO_RUN(17, "wait to run"),
FAILOVER(18, "failover"); FAILOVER(18, "failover");
private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new HashMap<>(); private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new HashMap<>();
@ -50,8 +51,7 @@ public enum WorkflowExecutionStatus {
RUNNING_EXECUTION.getCode(), RUNNING_EXECUTION.getCode(),
READY_PAUSE.getCode(), READY_PAUSE.getCode(),
READY_STOP.getCode(), READY_STOP.getCode(),
SERIAL_WAIT.getCode(), SERIAL_WAIT.getCode()
WAIT_TO_RUN.getCode()
}; };
static { static {
@ -80,12 +80,11 @@ public enum WorkflowExecutionStatus {
return this == RUNNING_EXECUTION return this == RUNNING_EXECUTION
|| this == READY_PAUSE || this == READY_PAUSE
|| this == READY_STOP || this == READY_STOP
|| this == SERIAL_WAIT || this == SERIAL_WAIT;
|| this == WAIT_TO_RUN;
} }
public boolean canDirectStopInDB() { public boolean canDirectStopInDB() {
return this == SERIAL_WAIT || this == WAIT_TO_RUN; return this == SERIAL_WAIT;
} }
public boolean canPause() { public boolean canPause() {
@ -95,7 +94,7 @@ public enum WorkflowExecutionStatus {
} }
public boolean canDirectPauseInDB() { public boolean canDirectPauseInDB() {
return this == SERIAL_WAIT || this == WAIT_TO_RUN; return this == SERIAL_WAIT;
} }
public boolean isFinished() { public boolean isFinished() {
@ -145,14 +144,6 @@ public enum WorkflowExecutionStatus {
this.desc = desc; this.desc = desc;
} }
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
@Override @Override
public String toString() { public String toString() {
return name(); return name();

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java

@ -59,7 +59,6 @@ public class BusinessTimeUtils {
case RECOVER_SUSPENDED_PROCESS: case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING: case REPEAT_RUNNING:
case DYNAMIC_GENERATION:
case SCHEDULER: case SCHEDULER:
default: default:
businessDate = addDays(new Date(), -1); businessDate = addDays(new Date(), -1);

88
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ListenerEvent.java

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.ListenerEventType;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_listener_event")
public class ListenerEvent {
/**
* primary key
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* content
*/
@TableField(value = "content")
private String content;
/**
* sign
*/
@TableField(value = "sign")
private String sign;
/**
* alert_status
*/
@TableField(value = "event_type")
private ListenerEventType eventType;
/**
* post_status
*/
@TableField("post_status")
private AlertStatus postStatus;
/**
* log
*/
@TableField("log")
private String log;
/**
* create_time
*/
@TableField("create_time")
private Date createTime;
/**
* update_time
*/
@TableField("update_time")
private Date updateTime;
}

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_dml.sql

@ -19,5 +19,3 @@ UPDATE t_ds_task_definition SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SU
UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SUB_PROCESS'; UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SUB_PROCESS';
UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW'; UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW'; UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW';
UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_dml.sql

@ -19,5 +19,3 @@ UPDATE t_ds_task_definition SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SU
UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SUB_PROCESS'; UPDATE t_ds_task_definition_log SET task_type = 'SUB_WORKFLOW' WHERE task_type = 'SUB_PROCESS';
UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW'; UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW'; UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'SUB_WORKFLOW';
UPDATE t_ds_task_definition SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';
UPDATE t_ds_task_definition_log SET task_params = replace(task_params, 'processDefinitionCode', 'workflowDefinitionCode') where task_type = 'DYNAMIC';

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java

@ -64,9 +64,7 @@ class WorkflowInstanceDaoImplTest extends BaseDaoTest {
WorkflowExecutionStatus.READY_STOP)); WorkflowExecutionStatus.READY_STOP));
workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.SERIAL_WAIT)); WorkflowExecutionStatus.SERIAL_WAIT));
workflowInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion, assertEquals(4, workflowInstanceDao
WorkflowExecutionStatus.WAIT_TO_RUN));
assertEquals(5, workflowInstanceDao
.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, status).size()); .queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, status).size());
} }

173
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicAsyncTaskExecuteFunction.java

@ -1,173 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DynamicAsyncTaskExecuteFunction {
private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
private static final String OUTPUT_KEY = "dynamic.out";
private final WorkflowInstance workflowInstance;
private final TaskInstance taskInstance;
private final SubWorkflowService subWorkflowService;
private final CommandMapper commandMapper;
private final int degreeOfParallelism;
private final DynamicLogicTask logicTask;
public DynamicAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext,
WorkflowInstance workflowInstance,
TaskInstance taskInstance,
DynamicLogicTask dynamicLogicTask,
CommandMapper commandMapper,
SubWorkflowService subWorkflowService,
int degreeOfParallelism) {
this.workflowInstance = workflowInstance;
this.taskInstance = taskInstance;
this.logicTask = dynamicLogicTask;
this.degreeOfParallelism = degreeOfParallelism;
this.commandMapper = commandMapper;
this.subWorkflowService = subWorkflowService;
}
public @NonNull TaskExecutionStatus getAsyncTaskExecutionStatus() {
List<WorkflowInstance> allSubWorkflowInstance = getAllSubProcessInstance();
int totalSubProcessInstanceCount = allSubWorkflowInstance.size();
List<WorkflowInstance> finishedSubWorkflowInstance =
subWorkflowService.filterFinishProcessInstances(allSubWorkflowInstance);
if (finishedSubWorkflowInstance.size() == totalSubProcessInstanceCount) {
log.info("all sub process instance finish");
int successCount = subWorkflowService.filterSuccessProcessInstances(finishedSubWorkflowInstance).size();
log.info("success sub process instance count: {}", successCount);
if (successCount == totalSubProcessInstanceCount) {
log.info("all sub process instance success");
setOutputParameters();
return TaskExecutionStatus.SUCCESS;
} else {
int failedCount = totalSubProcessInstanceCount - successCount;
log.info("failed sub process instance count: {}", failedCount);
return TaskExecutionStatus.FAILURE;
}
}
if (logicTask.isCancel()) {
return TaskExecutionStatus.FAILURE;
}
int runningCount = subWorkflowService.filterRunningProcessInstances(allSubWorkflowInstance).size();
int startCount = degreeOfParallelism - runningCount;
if (startCount > 0) {
log.info("There are {} sub process instances that can be started", startCount);
startSubProcessInstances(allSubWorkflowInstance, startCount);
}
// query the status of sub workflow instance
return TaskExecutionStatus.RUNNING_EXECUTION;
}
private void setOutputParameters() {
log.info("set varPool");
List<WorkflowInstance> allSubWorkflowInstance = getAllSubProcessInstance();
List<DynamicOutput> dynamicOutputs = new ArrayList<>();
int index = 1;
for (WorkflowInstance workflowInstance : allSubWorkflowInstance) {
DynamicOutput dynamicOutput = new DynamicOutput();
Map<String, String> dynamicParams =
JSONUtils.toMap(JSONUtils.toMap(workflowInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS));
dynamicOutput.setDynParams(dynamicParams);
Map<String, String> outputValueMap = new HashMap<>();
List<Property> propertyList = subWorkflowService.getWorkflowOutputParameters(workflowInstance);
for (Property property : propertyList) {
outputValueMap.put(property.getProp(), property.getValue());
}
dynamicOutput.setOutputValue(outputValueMap);
dynamicOutput.setMappedTimes(index++);
dynamicOutputs.add(dynamicOutput);
}
Property property = new Property();
property.setProp(String.format("%s(%s)", OUTPUT_KEY, taskInstance.getName()));
property.setDirect(Direct.OUT);
property.setType(DataType.VARCHAR);
property.setValue(JSONUtils.toJsonString(dynamicOutputs));
List<Property> taskPropertyList = new ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
taskPropertyList.add(property);
// logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
log.info("set property: {}", property);
}
private void startSubProcessInstances(List<WorkflowInstance> allSubWorkflowInstance, int startCount) {
List<WorkflowInstance> waitingWorkflowInstances =
subWorkflowService.filterWaitToRunProcessInstances(allSubWorkflowInstance);
for (int i = 0; i < Math.min(startCount, waitingWorkflowInstances.size()); i++) {
WorkflowInstance subWorkflowInstance = waitingWorkflowInstances.get(i);
Map<String, String> parameters = JSONUtils.toMap(DynamicCommandUtils
.getDataFromCommandParam(subWorkflowInstance.getCommandParam(), CMD_DYNAMIC_START_PARAMS));
Command command = DynamicCommandUtils.createCommand(this.workflowInstance,
subWorkflowInstance.getWorkflowDefinitionCode(), subWorkflowInstance.getWorkflowDefinitionVersion(),
parameters);
command.setWorkflowInstanceId(subWorkflowInstance.getId());
commandMapper.insert(command);
log.info("start sub process instance, sub process instance id: {}, command: {}",
subWorkflowInstance.getId(),
command);
}
}
public List<WorkflowInstance> getAllSubProcessInstance() {
return subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(), taskInstance.getTaskCode());
}
}

86
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicCommandUtils.java

@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
public class DynamicCommandUtils {
static public Command createCommand(WorkflowInstance workflowInstance,
Long subProcessDefinitionCode,
Integer subProcessDefinitionVersion,
Map<String, String> parameters) {
Command command = new Command();
if (workflowInstance.getCommandType().equals(CommandType.START_PROCESS)) {
command.setCommandType(CommandType.DYNAMIC_GENERATION);
} else {
command.setCommandType(workflowInstance.getCommandType());
}
command.setWorkflowDefinitionCode(subProcessDefinitionCode);
command.setWorkflowDefinitionVersion(subProcessDefinitionVersion);
command.setTaskDependType(TaskDependType.TASK_POST);
command.setFailureStrategy(workflowInstance.getFailureStrategy());
command.setWarningType(workflowInstance.getWarningType());
String globalParams = workflowInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
List<Property> parentParams = Lists.newArrayList(JSONUtils.toList(globalParams, Property.class));
for (Property parentParam : parentParams) {
parameters.put(parentParam.getProp(), parentParam.getValue());
}
}
addDataToCommandParam(command, CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters));
command.setExecutorId(workflowInstance.getExecutorId());
command.setWarningGroupId(workflowInstance.getWarningGroupId());
command.setWorkflowInstancePriority(workflowInstance.getWorkflowInstancePriority());
command.setWorkerGroup(workflowInstance.getWorkerGroup());
command.setDryRun(workflowInstance.getDryRun());
command.setTenantCode(workflowInstance.getTenantCode());
return command;
}
static public String getDataFromCommandParam(String commandParam, String key) {
Map<String, String> cmdParam = JSONUtils.toMap(commandParam);
return cmdParam.get(key);
}
static void addDataToCommandParam(Command command, String key, String data) {
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
if (cmdParam == null) {
cmdParam = new HashMap<>();
}
cmdParam.put(key, data);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
}

339
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTask.java

@ -1,339 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
@Slf4j
public class DynamicLogicTask extends AbstractLogicTask<DynamicParameters> {
public static final String TASK_TYPE = "DYNAMIC";
private final WorkflowInstanceDao workflowInstanceDao;
private final SubWorkflowService subWorkflowService;
private final WorkflowDefinitionMapper workflowDefinitionMapper;
private final CommandMapper commandMapper;
private final ProcessService processService;
private WorkflowInstance workflowInstance;
private TaskInstance taskInstance;
private final TaskExecutionContext taskExecutionContext;
private boolean haveBeenCanceled = false;
public DynamicLogicTask(TaskExecutionContext taskExecutionContext,
WorkflowInstanceDao workflowInstanceDao,
TaskInstanceDao taskInstanceDao,
SubWorkflowService subWorkflowService,
ProcessService processService,
WorkflowDefinitionMapper workflowDefinitionMapper,
CommandMapper commandMapper) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.workflowInstanceDao = workflowInstanceDao;
this.subWorkflowService = subWorkflowService;
this.processService = processService;
this.workflowDefinitionMapper = workflowDefinitionMapper;
this.commandMapper = commandMapper;
this.workflowInstance = workflowInstanceDao.queryById(taskExecutionContext.getWorkflowInstanceId());
this.taskInstance = taskInstanceDao.queryById(taskExecutionContext.getTaskInstanceId());
}
// public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
// List<Map<String, String>> parameterGroup = generateParameterGroup();
//
// if (parameterGroup.size() > dynamicParameters.getMaxNumOfSubWorkflowInstances()) {
// log.warn("the number of sub process instances [{}] exceeds the maximum limit [{}]", parameterGroup.size(),
// dynamicParameters.getMaxNumOfSubWorkflowInstances());
// parameterGroup = parameterGroup.subList(0, dynamicParameters.getMaxNumOfSubWorkflowInstances());
// }
//
// // if already exists sub process instance, do not generate again
// List<WorkflowInstance> existsSubWorkflowInstanceList =
// subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(), taskInstance.getTaskCode());
// if (CollectionUtils.isEmpty(existsSubWorkflowInstanceList)) {
// generateSubWorkflowInstance(parameterGroup);
// } else {
// resetProcessInstanceStatus(existsSubWorkflowInstanceList);
// }
// return new DynamicAsyncTaskExecuteFunction(taskExecutionContext, workflowInstance, taskInstance, this,
// commandMapper,
// subWorkflowService, dynamicParameters.getDegreeOfParallelism());
// }
public void resetProcessInstanceStatus(List<WorkflowInstance> existsSubWorkflowInstanceList) {
switch (workflowInstance.getCommandType()) {
case REPEAT_RUNNING:
existsSubWorkflowInstanceList.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
workflowInstanceDao.updateById(processInstance);
});
break;
case START_FAILURE_TASK_PROCESS:
case RECOVER_TOLERANCE_FAULT_PROCESS:
List<WorkflowInstance> failedWorkflowInstances =
subWorkflowService.filterFailedProcessInstances(existsSubWorkflowInstanceList);
failedWorkflowInstances.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
workflowInstanceDao.updateById(processInstance);
});
break;
}
}
public void generateSubWorkflowInstance(List<Map<String, String>> parameterGroup) throws MasterTaskExecuteException {
List<WorkflowInstance> workflowInstanceList = new ArrayList<>();
WorkflowDefinition subWorkflowDefinition =
workflowDefinitionMapper.queryByCode(taskParameters.getWorkflowDefinitionCode());
for (Map<String, String> parameters : parameterGroup) {
String dynamicStartParams = JSONUtils.toJsonString(parameters);
Command command = DynamicCommandUtils.createCommand(workflowInstance, subWorkflowDefinition.getCode(),
subWorkflowDefinition.getVersion(), parameters);
// todo: set id to -1? we use command to generate sub process instance, but the generate method will use the
// command id to do
// somethings
command.setId(-1);
DynamicCommandUtils.addDataToCommandParam(command, CommandKeyConstants.CMD_DYNAMIC_START_PARAMS,
dynamicStartParams);
WorkflowInstance subWorkflowInstance = createSubProcessInstance(command);
subWorkflowInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
workflowInstanceDao.insert(subWorkflowInstance);
command.setWorkflowInstanceId(subWorkflowInstance.getId());
workflowInstanceList.add(subWorkflowInstance);
}
List<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<>();
for (WorkflowInstance subWorkflowInstance : workflowInstanceList) {
RelationSubWorkflow relationSubWorkflow = new RelationSubWorkflow();
relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(workflowInstance.getId()));
relationSubWorkflow.setParentTaskCode(taskInstance.getTaskCode());
relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subWorkflowInstance.getId()));
relationSubWorkflowList.add(relationSubWorkflow);
}
log.info("Expected number of runs : {}, actual number of runs : {}", parameterGroup.size(),
workflowInstanceList.size());
int insertN = subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList);
log.info("insert {} relation sub workflow", insertN);
}
public WorkflowInstance createSubProcessInstance(Command command) throws MasterTaskExecuteException {
WorkflowInstance subWorkflowInstance;
try {
subWorkflowInstance = processService.constructWorkflowInstance(command, workflowInstance.getHost());
subWorkflowInstance.setIsSubWorkflow(Flag.YES);
subWorkflowInstance.setVarPool(taskExecutionContext.getVarPool());
} catch (Exception e) {
log.error("create sub process instance error", e);
throw new MasterTaskExecuteException(e.getMessage());
}
return subWorkflowInstance;
}
public List<Map<String, String>> generateParameterGroup() {
List<DynamicInputParameter> dynamicInputParameters = getDynamicInputParameters();
Set<String> filterStrings =
Arrays.stream(StringUtils.split(taskParameters.getFilterCondition(), ",")).map(String::trim)
.collect(Collectors.toSet());
List<List<DynamicInputParameter>> allParameters = new ArrayList<>();
for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
List<DynamicInputParameter> singleParameters = new ArrayList<>();
String value = dynamicInputParameter.getValue();
String separator = dynamicInputParameter.getSeparator();
List<String> valueList =
Arrays.stream(StringUtils.split(value, separator)).map(String::trim).collect(Collectors.toList());
valueList = valueList.stream().filter(v -> !filterStrings.contains(v)).collect(Collectors.toList());
for (String v : valueList) {
DynamicInputParameter singleParameter = new DynamicInputParameter();
singleParameter.setName(dynamicInputParameter.getName());
singleParameter.setValue(v);
singleParameters.add(singleParameter);
}
allParameters.add(singleParameters);
}
// use Sets.cartesianProduct to get the cartesian product of all parameters
List<List<DynamicInputParameter>> cartesianProduct = Lists.cartesianProduct(allParameters);
// convert cartesian product to parameter group List<Map<name:value>>
List<Map<String, String>> parameterGroup = cartesianProduct.stream().map(
inputParameterList -> inputParameterList.stream().collect(
Collectors.toMap(DynamicInputParameter::getName, DynamicInputParameter::getValue)))
.collect(Collectors.toList());
log.info("parameter group size: {}", parameterGroup.size());
// log every parameter group
if (CollectionUtils.isNotEmpty(parameterGroup)) {
for (Map<String, String> map : parameterGroup) {
log.info("parameter group: {}", map);
}
}
return parameterGroup;
}
private List<DynamicInputParameter> getDynamicInputParameters() {
List<DynamicInputParameter> dynamicInputParameters = taskParameters.getListParameters();
if (CollectionUtils.isNotEmpty(dynamicInputParameters)) {
for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
String value = dynamicInputParameter.getValue();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
value = ParameterUtils.convertParameterPlaceholders(value, ParameterUtils.convert(paramsMap));
dynamicInputParameter.setValue(value);
}
}
return dynamicInputParameters;
}
@Override
public void start() throws MasterTaskExecuteException {
// todo:
}
@Override
public TaskExecutionStatus getTaskExecutionState() {
return taskExecutionContext.getCurrentExecutionStatus();
}
@Override
public void pause() throws MasterTaskExecuteException {
// todo: support pause
}
@Override
public void kill() {
try {
doKillSubWorkflowInstances();
} catch (MasterTaskExecuteException e) {
log.error("kill {} error", taskInstance.getName(), e);
}
}
@Override
public ITaskParameterDeserializer<DynamicParameters> getTaskParameterDeserializer() {
return taskParamsJson -> JSONUtils.parseObject(taskParamsJson, new TypeReference<DynamicParameters>() {
});
}
private void doKillSubWorkflowInstances() throws MasterTaskExecuteException {
List<WorkflowInstance> existsSubWorkflowInstanceList =
subWorkflowService.getAllDynamicSubWorkflow(workflowInstance.getId(), taskInstance.getTaskCode());
if (CollectionUtils.isEmpty(existsSubWorkflowInstanceList)) {
return;
}
commandMapper.deleteByWorkflowInstanceIds(
existsSubWorkflowInstanceList.stream().map(WorkflowInstance::getId).collect(Collectors.toList()));
List<WorkflowInstance> runningSubWorkflowInstanceList =
subWorkflowService.filterRunningProcessInstances(existsSubWorkflowInstanceList);
doKillRunningSubWorkflowInstances(runningSubWorkflowInstanceList);
List<WorkflowInstance> waitToRunWorkflowInstances =
subWorkflowService.filterWaitToRunProcessInstances(existsSubWorkflowInstanceList);
doKillWaitToRunSubWorkflowInstances(waitToRunWorkflowInstances);
this.haveBeenCanceled = true;
}
private void doKillRunningSubWorkflowInstances(List<WorkflowInstance> runningSubWorkflowInstanceList) throws MasterTaskExecuteException {
for (WorkflowInstance subWorkflowInstance : runningSubWorkflowInstanceList) {
try {
WorkflowInstanceStopResponse workflowInstanceStopResponse = Clients
.withService(IWorkflowControlClient.class)
.withHost(subWorkflowInstance.getHost())
.stopWorkflowInstance(new WorkflowInstanceStopRequest(subWorkflowInstance.getId()));
if (workflowInstanceStopResponse.isSuccess()) {
log.info("Stop SubWorkflow: {} successfully", subWorkflowInstance.getName());
} else {
throw new MasterTaskExecuteException(
"Stop subWorkflow: " + subWorkflowInstance.getName() + " failed");
}
} catch (MasterTaskExecuteException me) {
throw me;
} catch (Exception e) {
throw new MasterTaskExecuteException(
String.format("Send stop request to SubWorkflow's master: %s failed",
subWorkflowInstance.getHost()),
e);
}
}
}
private void doKillWaitToRunSubWorkflowInstances(List<WorkflowInstance> waitToRunWorkflowInstances) {
for (WorkflowInstance subWorkflowInstance : waitToRunWorkflowInstances) {
subWorkflowInstance.setState(WorkflowExecutionStatus.STOP);
workflowInstanceDao.updateById(subWorkflowInstance);
}
}
public boolean isCancel() {
return haveBeenCanceled;
}
}

74
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicLogicTaskPluginFactory.java

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ILogicTaskPluginFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DynamicLogicTaskPluginFactory implements ILogicTaskPluginFactory<DynamicLogicTask> {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private WorkflowDefinitionMapper processDefineMapper;
@Autowired
private CommandMapper commandMapper;
@Autowired
private ProcessService processService;
@Autowired
SubWorkflowService subWorkflowService;
@Override
public DynamicLogicTask createLogicTask(final ITaskExecutor taskExecutor) {
final TaskExecutionContext taskExecutionContext = taskExecutor.getTaskExecutionContext();
return new DynamicLogicTask(taskExecutionContext,
workflowInstanceDao,
taskInstanceDao,
subWorkflowService,
processService,
processDefineMapper,
commandMapper);
}
@Override
public String getTaskType() {
return DynamicLogicTask.TASK_TYPE;
}
}

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/dynamic/DynamicOutput.java

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.executor.plugin.dynamic;
import java.util.Map;
import lombok.Data;
@Data
public class DynamicOutput {
private Map<String, String> dynParams;
private Map<String, String> outputValue;
private int mappedTimes;
}

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/IWorkflowStateAction.java

@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkf
* @see WorkflowFailedStateAction * @see WorkflowFailedStateAction
* @see WorkflowSuccessStateAction * @see WorkflowSuccessStateAction
* @see WorkflowFailoverStateAction * @see WorkflowFailoverStateAction
* @see WorkflowWaitToRunStateAction
*/ */
public interface IWorkflowStateAction { public interface IWorkflowStateAction {

117
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowWaitToRunStateAction.java

@ -1,117 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPausedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStoppedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowSucceedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WorkflowWaitToRunStateAction extends AbstractWorkflowStateAction {
@Override
public void startEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStartLifecycleEvent workflowStartEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStartEvent);
}
@Override
public void topologyLogicalTransitionEventAction(
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowTopologyLogicalTransitionWithTaskFinishEvent);
}
@Override
public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowPauseLifecycleEvent workflowPauseEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowPauseEvent);
}
@Override
public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowPausedLifecycleEvent workflowPausedEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowPausedEvent);
}
@Override
public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStopLifecycleEvent workflowStopEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStopEvent);
}
@Override
public void stoppedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStoppedLifecycleEvent workflowStoppedEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStoppedEvent);
}
@Override
public void succeedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowSucceedLifecycleEvent workflowSucceedEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowSucceedEvent);
}
@Override
public void failedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowFailedLifecycleEvent workflowFailedEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowFailedEvent);
}
@Override
public void finalizeEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowFinalizeLifecycleEvent workflowFinalizeEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowFinalizeEvent);
}
@Override
public WorkflowExecutionStatus matchState() {
return WorkflowExecutionStatus.WAIT_TO_RUN;
}
/**
* The running state can only finish with success/failure.
*/
@Override
protected void emitWorkflowFinishedEventIfApplicable(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
log.warn("The workflow: {} is in wait_to_run state, shouldn't emit workflow finished event",
workflowExecutionRunnable.getName());
}
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java

@ -38,8 +38,6 @@ public interface SubWorkflowService {
List<WorkflowInstance> filterRunningProcessInstances(List<WorkflowInstance> workflowInstanceList); List<WorkflowInstance> filterRunningProcessInstances(List<WorkflowInstance> workflowInstanceList);
List<WorkflowInstance> filterWaitToRunProcessInstances(List<WorkflowInstance> workflowInstanceList);
List<WorkflowInstance> filterFailedProcessInstances(List<WorkflowInstance> workflowInstanceList); List<WorkflowInstance> filterFailedProcessInstances(List<WorkflowInstance> workflowInstanceList);
List<Property> getWorkflowOutputParameters(WorkflowInstance workflowInstance); List<Property> getWorkflowOutputParameters(WorkflowInstance workflowInstance);

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.subworkflow; package org.apache.dolphinscheduler.service.subworkflow;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow; import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinitionLog;
@ -84,13 +83,6 @@ public class SubWorkflowServiceImpl implements SubWorkflowService {
.filter(subProcessInstance -> subProcessInstance.getState().isRunning()).collect(Collectors.toList()); .filter(subProcessInstance -> subProcessInstance.getState().isRunning()).collect(Collectors.toList());
} }
@Override
public List<WorkflowInstance> filterWaitToRunProcessInstances(List<WorkflowInstance> workflowInstanceList) {
return workflowInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().equals(WorkflowExecutionStatus.WAIT_TO_RUN))
.collect(Collectors.toList());
}
@Override @Override
public List<WorkflowInstance> filterFailedProcessInstances(List<WorkflowInstance> workflowInstanceList) { public List<WorkflowInstance> filterFailedProcessInstances(List<WorkflowInstance> workflowInstanceList) {
return workflowInstanceList.stream() return workflowInstanceList.stream()

30
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannel.java

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.task;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
public class DynamicLogicTaskChannel extends AbstractLogicTaskChannel {
@Override
public AbstractParameters parseParameters(String taskParams) {
return JSONUtils.parseObject(taskParams, DynamicParameters.class);
}
}

38
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/task/DynamicLogicTaskChannelFactory.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.task;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class DynamicLogicTaskChannelFactory implements TaskChannelFactory {
public static final String NAME = "DYNAMIC";
@Override
public String getName() {
return NAME;
}
@Override
public TaskChannel create() {
return new DynamicLogicTaskChannel();
}
}

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskTypeUtils.java

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.plugin.task.api.ILogicTaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
@ -50,10 +49,6 @@ public class TaskTypeUtils {
return DependentLogicTaskChannelFactory.NAME.equals(taskType); return DependentLogicTaskChannelFactory.NAME.equals(taskType);
} }
public boolean isDynamicTask(String taskType) {
return DynamicLogicTaskChannelFactory.NAME.equals(taskType);
}
public boolean isLogicTask(String taskType) { public boolean isLogicTask(String taskType) {
checkArgument(StringUtils.isNotEmpty(taskType), "taskType cannot be empty"); checkArgument(StringUtils.isNotEmpty(taskType), "taskType cannot be empty");
return TaskPluginManager.getTaskChannel(taskType) instanceof ILogicTaskChannel; return TaskPluginManager.getTaskChannel(taskType) instanceof ILogicTaskChannel;

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManagerTest.java

@ -21,7 +21,6 @@ import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
@ -34,7 +33,6 @@ class TaskPluginManagerTest {
@ValueSource(strings = { @ValueSource(strings = {
ConditionsLogicTaskChannelFactory.NAME, ConditionsLogicTaskChannelFactory.NAME,
DependentLogicTaskChannelFactory.NAME, DependentLogicTaskChannelFactory.NAME,
DynamicLogicTaskChannelFactory.NAME,
SubWorkflowLogicTaskChannelFactory.NAME, SubWorkflowLogicTaskChannelFactory.NAME,
SwitchLogicTaskChannelFactory.NAME}) SwitchLogicTaskChannelFactory.NAME})
void testGetTaskChannel_logicTaskChannel(String type) { void testGetTaskChannel_logicTaskChannel(String type) {

BIN
dolphinscheduler-ui/public/images/task-icons/dynamic.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

3
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -894,9 +894,6 @@ export default {
filter_condition: 'Filter Condition', filter_condition: 'Filter Condition',
params_value: 'Params Value', params_value: 'Params Value',
separator: 'Separator', separator: 'Separator',
dynamic_name_tips: 'name(required)',
dynamic_value_tips: 'params or value(required)',
dynamic_separator_tips: 'separator(required)',
child_node_definition: 'child node definition', child_node_definition: 'child node definition',
child_node_instance: 'child node instance', child_node_instance: 'child node instance',
yarn_queue: 'Yarn Queue', yarn_queue: 'Yarn Queue',

3
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -864,9 +864,6 @@ export default {
filter_condition: '过滤条件', filter_condition: '过滤条件',
params_value: '取值参数', params_value: '取值参数',
separator: '分隔符', separator: '分隔符',
dynamic_name_tips: 'name(必填)',
dynamic_value_tips: 'params or value(必填)',
dynamic_separator_tips: '分隔符(必填)',
child_node_definition: '子节点定义', child_node_definition: '子节点定义',
child_node_instance: '子节点实例', child_node_instance: '子节点实例',
yarn_queue: 'Yarn队列', yarn_queue: 'Yarn队列',

3
dolphinscheduler-ui/src/store/project/task-type.ts

@ -32,9 +32,6 @@ export const TASK_TYPES_MAP = {
SUB_WORKFLOW: { SUB_WORKFLOW: {
alias: 'SUB_WORKFLOW' alias: 'SUB_WORKFLOW'
}, },
DYNAMIC: {
alias: 'DYNAMIC'
},
PROCEDURE: { PROCEDURE: {
alias: 'PROCEDURE' alias: 'PROCEDURE'
}, },

1
dolphinscheduler-ui/src/store/project/types.ts

@ -23,7 +23,6 @@ type TaskExecuteType = 'STREAM' | 'BATCH'
type TaskType = type TaskType =
| 'SHELL' | 'SHELL'
| 'SUB_WORKFLOW' | 'SUB_WORKFLOW'
| 'DYNAMIC'
| 'PROCEDURE' | 'PROCEDURE'
| 'SQL' | 'SQL'
| 'SPARK' | 'SPARK'

4
dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx

@ -179,9 +179,7 @@ const NodeDetailModal = defineComponent({
}, },
{ {
text: t('project.node.enter_this_child_node'), text: t('project.node.enter_this_child_node'),
show: show: props.data.taskType === 'SUB_WORKFLOW',
props.data.taskType === 'SUB_WORKFLOW' ||
props.data.taskType === 'DYNAMIC',
disabled: disabled:
!props.data.id || !props.data.id ||
(router.currentRoute.value.name === 'workflow-instance-detail' && (router.currentRoute.value.name === 'workflow-instance-detail' &&

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -88,6 +88,5 @@ export { useKubeflow } from './use-kubeflow'
export { useLinkis } from './use-linkis' export { useLinkis } from './use-linkis'
export { useDataFactory } from './use-data-factory' export { useDataFactory } from './use-data-factory'
export { useRemoteShell } from './use-remote-shell' export { useRemoteShell } from './use-remote-shell'
export { useDynamic } from './use-dynamic'
export { useYarnQueue } from './use-queue' export { useYarnQueue } from './use-queue'
export { useAliyunServerlessSpark } from './use-aliyun-serverless-spark' export { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'

120
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { IJsonItem } from '../types'
import { useI18n } from 'vue-i18n'
export function useDynamic(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
return [
{
type: 'input-number',
field: 'maxNumOfSubWorkflowInstances',
span: 12,
name: t('project.node.max_num_of_sub_workflow_instances'),
validate: {
required: true
}
},
{
type: 'input-number',
field: 'degreeOfParallelism',
span: 12,
name: t('project.node.parallelism'),
validate: {
required: true
}
},
{
type: 'custom-parameters',
field: 'listParameters',
name: t('project.node.params_value'),
span: 24,
children: [
{
type: 'input',
field: 'name',
span: 8,
props: {
placeholder: t('project.node.dynamic_name_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_name_tips'))
}
const sameItems = model['listParameters'].filter(
(item: { name: string }) => item.name === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 8,
props: {
placeholder: t('project.node.dynamic_value_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_value_tips'))
}
}
}
},
{
type: 'input',
field: 'separator',
span: 4,
props: {
placeholder: t('project.node.dynamic_separator_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_separator_tips'))
}
}
}
}
]
},
{
type: 'input',
field: 'filterCondition',
span: 24,
name: t('project.node.filter_condition')
}
]
}

10
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -34,7 +34,7 @@ export function formatParams(data: INodeData): {
} { } {
const rdbmsSourceTypes = ref(['MYSQL', 'ORACLE', 'SQLSERVER', 'HANA']) const rdbmsSourceTypes = ref(['MYSQL', 'ORACLE', 'SQLSERVER', 'HANA'])
const taskParams: ITaskParams = {} const taskParams: ITaskParams = {}
if (data.taskType === 'SUB_WORKFLOW' || data.taskType === 'DYNAMIC') { if (data.taskType === 'SUB_WORKFLOW') {
taskParams.workflowDefinitionCode = data.workflowDefinitionCode taskParams.workflowDefinitionCode = data.workflowDefinitionCode
} }
@ -454,14 +454,6 @@ export function formatParams(data: INodeData): {
taskParams.datasource = data.datasource taskParams.datasource = data.datasource
} }
if (data.taskType === 'DYNAMIC') {
taskParams.workflowDefinitionCode = data.workflowDefinitionCode
taskParams.maxNumOfSubWorkflowInstances = data.maxNumOfSubWorkflowInstances
taskParams.degreeOfParallelism = data.degreeOfParallelism
taskParams.filterCondition = data.filterCondition
taskParams.listParameters = data.listParameters
}
let timeoutNotifyStrategy = '' let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) { if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) { if (data.timeoutNotifyStrategy.length === 1) {

2
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -50,13 +50,11 @@ import { useKubeflow } from './use-kubeflow'
import { useLinkis } from './use-linkis' import { useLinkis } from './use-linkis'
import { useDataFactory } from './use-data-factory' import { useDataFactory } from './use-data-factory'
import { useRemoteShell } from './use-remote-shell' import { useRemoteShell } from './use-remote-shell'
import { useDynamic } from './use-dynamic'
import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark' import { useAliyunServerlessSpark } from './use-aliyun-serverless-spark'
export default { export default {
SHELL: useShell, SHELL: useShell,
SUB_WORKFLOW: useSubWorkflow, SUB_WORKFLOW: useSubWorkflow,
DYNAMIC: useDynamic,
PYTHON: usePython, PYTHON: usePython,
SPARK: useSpark, SPARK: useSpark,
MR: useMr, MR: useMr,

83
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import { useRouter } from 'vue-router'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useDynamic({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const router = useRouter()
const workflowCode = router.currentRoute.value.params.code
const model = reactive({
taskType: 'DYNAMIC',
name: '',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
maxNumOfSubWorkflowInstances: 1024,
degreeOfParallelism: 1,
filterCondition: '',
listParameters: [{ name: null, value: null, separator: ',' }]
} as INodeData)
if (model.listParameters?.length) {
model.listParameters[0].disabled = true
}
return {
json: [
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(projectCode),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useTimeoutAlarm(model),
Fields.useChildNode({
model,
projectCode,
from,
workflowName: data?.workflowDefinitionName,
code: from === 1 ? 0 : Number(workflowCode)
}),
...Fields.useDynamic(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

4
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -17,7 +17,6 @@
export type TaskType = export type TaskType =
| 'SHELL' | 'SHELL'
| 'SUB_WORKFLOW' | 'SUB_WORKFLOW'
| 'DYNAMIC'
| 'PROCEDURE' | 'PROCEDURE'
| 'SQL' | 'SQL'
| 'SPARK' | 'SPARK'
@ -65,9 +64,6 @@ export const TASK_TYPES_MAP = {
SUB_WORKFLOW: { SUB_WORKFLOW: {
alias: 'SUB_WORKFLOW' alias: 'SUB_WORKFLOW'
}, },
DYNAMIC: {
alias: 'DYNAMIC'
},
PROCEDURE: { PROCEDURE: {
alias: 'PROCEDURE' alias: 'PROCEDURE'
}, },

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -107,9 +107,6 @@ $bgLight: #ffffff;
&.icon-sub_workflow { &.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow.png'); background-image: url('/images/task-icons/sub_workflow.png');
} }
&.icon-dynamic {
background-image: url('/images/task-icons/dynamic.png');
}
&.icon-procedure { &.icon-procedure {
background-image: url('/images/task-icons/procedure.png'); background-image: url('/images/task-icons/procedure.png');
} }
@ -220,9 +217,6 @@ $bgLight: #ffffff;
&.icon-sub_workflow { &.icon-sub_workflow {
background-image: url('/images/task-icons/sub_workflow_hover.png'); background-image: url('/images/task-icons/sub_workflow_hover.png');
} }
&.icon-dynamic {
background-image: url('/images/task-icons/dynamic_hover.png');
}
&.icon-procedure { &.icon-procedure {
background-image: url('/images/task-icons/procedure_hover.png'); background-image: url('/images/task-icons/procedure_hover.png');
} }

Loading…
Cancel
Save