Browse Source

[Feature][Task Plugin] Add Dynamic task to generate dynamic worklfows with list parameters (#14127)

* add dynamic task plugin

* fix ui

* update code

* add DOC

* ADD UT

* fix doc

* fix sanity check

* add upgrade ddl
* fix sql
3.2.1-prepare
JieguangZhou 1 year ago committed by GitHub
parent
commit
e7cdc7c136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 77
      docs/docs/en/guide/task/dynamic.md
  3. 76
      docs/docs/zh/guide/task/dynamic.md
  4. BIN
      docs/img/tasks/demo/dynamic_definition.png
  5. BIN
      docs/img/tasks/demo/dynamic_running.png
  6. BIN
      docs/img/tasks/icons/dynamic.png
  7. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  8. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
  9. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  10. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  11. 68
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  12. 1
      dolphinscheduler-api/src/main/resources/task-type-config.yaml
  13. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
  14. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
  15. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
  16. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
  17. 50
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
  18. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  19. 40
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
  20. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  21. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  22. 44
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
  23. 13
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  24. 13
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  25. 13
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  26. 24
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
  27. 24
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
  28. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
  29. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  30. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
  31. 178
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
  32. 85
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
  33. 309
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
  34. 72
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java
  35. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java
  36. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java
  37. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java
  38. 141
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunctionTest.java
  39. 133
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtilsTest.java
  40. 185
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java
  41. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  42. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  43. 46
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java
  44. 119
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java
  45. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  46. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
  47. 33
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java
  48. 53
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java
  49. BIN
      dolphinscheduler-ui/public/images/task-icons/dynamic.png
  50. BIN
      dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png
  51. 17
      dolphinscheduler-ui/src/common/common.ts
  52. 1
      dolphinscheduler-ui/src/common/types.ts
  53. 12
      dolphinscheduler-ui/src/locales/en_US/project.ts
  54. 12
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  55. 3
      dolphinscheduler-ui/src/store/project/task-type.ts
  56. 1
      dolphinscheduler-ui/src/store/project/types.ts
  57. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx
  58. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  59. 120
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts
  60. 10
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  61. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  62. 83
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts
  63. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  64. 4
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  65. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

8
docs/configs/docsdev.js

@ -101,6 +101,10 @@ export default {
title: 'SubProcess', title: 'SubProcess',
link: '/en-us/docs/dev/user_doc/guide/task/sub-process.html', link: '/en-us/docs/dev/user_doc/guide/task/sub-process.html',
}, },
{
title: 'Dynamic',
link: '/en-us/docs/dev/user_doc/guide/task/dynamic.html',
},
{ {
title: 'Dependent', title: 'Dependent',
link: '/en-us/docs/dev/user_doc/guide/task/dependent.html', link: '/en-us/docs/dev/user_doc/guide/task/dependent.html',
@ -810,6 +814,10 @@ export default {
title: 'SubProcess', title: 'SubProcess',
link: '/zh-cn/docs/dev/user_doc/guide/task/sub-process.html', link: '/zh-cn/docs/dev/user_doc/guide/task/sub-process.html',
}, },
{
title: 'Dynamic',
link: '/zh-cn/docs/dev/user_doc/guide/task/dynamic.html',
},
{ {
title: 'Dependent', title: 'Dependent',
link: '/zh-cn/docs/dev/user_doc/guide/task/dependent.html', link: '/zh-cn/docs/dev/user_doc/guide/task/dependent.html',

77
docs/docs/en/guide/task/dynamic.md

@ -0,0 +1,77 @@
# Dynamic Task
## Overview
Dynamic task can input multiple parameter lists, calculate all parameter combinations through Cartesian product, and then execute each parameter combination as a sub-workflow node.
For example, we have a workflow with two input parameters, a, b.
We can use the dynamic node to define this workflow definition as a node, and then enter the parameter list
- Parameter a: a1, a2
- Parameter b: b1, b2
Then the dynamic node will calculate four parameter combinations, which are
- a1, b1
- a1, b2
- a2, b1
- a2, b2
Then execute these four parameter combinations as the startup parameters of the sub-workflow node, and a total of four sub-workflow nodes are generated.
## Create Task
- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
DAG editing page.
- Drag from the toolbar <img src="../../../../img/tasks/icons/dynamic.png" width="15"/> task node to canvas.
The task definition is shown in the following figure:
![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
## Task Parameters
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix]&#40;appendix.md#default-task-parameters&#41; `Default Task Parameters` section for default parameters.)
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
| **Task Parameters** | **Description** |
|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Child Node | Select the workflow definition of the sub-workflow. You can jump to the workflow definition of the selected sub-workflow by entering the sub-node in the upper right corner. |
| max num of sub workflow instances | The maximum number of sub-workflow instances dynamically generated. After exceeding this upper limit, the dynamically generated sub-workflow instances will no longer be executed. |
| Parallelism | The parallelism of the sub-workflow instances dynamically generated, that is, the number of sub-workflow instances executed at the same time. |
| Param Value | The parameter of the sub-workflow instance dynamically generated, supports multiple parameters, and the parameters are separated by delimiters. |
| Filter Condition | The filter condition of the sub-workflow instance dynamically generated, supports multiple filter values, and the filter conditions are separated by commas, such as `2022,2023`, which will filter the parameter groups containing the values of 2022 and 2023. |
## Task Parameters Output
The output parameters of the dynamic node refer to the output parameters of the sub-workflow. The output parameters of all sub-workflows will be collected into a list as the output parameters of the dynamic node.
When the downstream task is referenced, it can be referenced by `${dynamic.out(TaskName)}`.
The value is a json, as shown below
```Json
[
{ "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
{ "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
{ "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
]
```
- `dynParams` the input parameters of the sub-workflow
- `outputValue` is the output parameter of the sub-workflow. For example, the `p` here is a string that splices the output parameters `a` and `b` of the sub-workflow and outputs them in the form of variables `p`
- `mappedTimes` is the index of the execution of the sub-workflow, starting from 1
## Running Status
After the dynamic task is started, all parameter combinations will be calculated according to the input parameter list, and then a sub-workflow instance will be created for each parameter combination.
When the dynamic task is running, it will periodically check the statistical information of all current sub-workflow instances. If the parallelism is greater than the number of sub-workflow instances running, it will trigger the start of the appropriate number of sub-workflow instances (the sub-workflow instances are created first, and then the start is triggered later).
As shown below.
![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
The dynamic task will run successfully only when all sub-workflow instances are running successfully.

76
docs/docs/zh/guide/task/dynamic.md

@ -0,0 +1,76 @@
# 动态节点
## 综述
动态节点可以通过输入多个参数列表,通过笛卡尔积计算出多所有的参数组合,然后将每个参数组合作为一个子工作流节点执行。
比如我们有一个工作流,它具有两个输入参数,a, b。
我们可以通过动态节点,将这个工作流定义当做一个节点,然后输入参数列表
- 参数a:a1, a2
- 参数b:b1, b2
那么动态节点会计算出四个参数组合,分别是
- a1, b1
- a1, b2
- a2, b1
- a2, b2
然后将这四个参数组合作为子工作流节点的启动参数执行,共生成四个子工作流节点。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击”创建工作流”按钮,进入 DAG 编辑页面:
- 拖动工具栏的 <img src="../../../../img/tasks/icons/dynamic.png" width="15"/> 任务节点到画板中。
任务定义如下图所示:
![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
## 任务参数
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录]&#40;appendix.md#默认任务参数&#41;`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
| **任务参数** | **描述** |
|----------|------------------------------------------------------------------------------|
| 子节点 | 是选择子工作流的工作流定义,右上角进入该子节点可以跳转到所选子工作流的工作流定义 |
| 动态生成实例上限 | 是指动态生成的子工作流实例的上限,超过该上限后,动态生成的子工作流实例将不再执行 |
| 并行度 | 是指动态生成的子工作流实例的并行度,即同时执行的子工作流实例的数量 |
| 取值参数 | 是指动态生成的子工作流实例的参数,支持多个参数,参数之间用分隔符分隔 |
| 过滤条件 | 是指动态生成的子工作流实例的过滤条件,支持多个过滤值,过滤条件之间用逗号分隔, 如 `2022,2023`, 则会过来包含2022和2023的值的参数组 |
## 任务参数输出
动态节点的输出参数,是指子工作流的输出参数,所有子工作流的输出参数都会被收集到一个列表中,作为动态节点的输出参数。
下游任务引用的时候,可以通过 `${dynamic.out(TaskName)}` 的方式引用。
值为一个json,样例如下
```Json
[
{ "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
{ "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
{ "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
]
```
其中
- `dynParams` 是子工作流的输入参数
- `outputValue` 是子工作流的输出参数, 如这里的`p`为将子工作流的输出参数`a`和`b`拼接起来的字符串,以变量`p`的形式输出
- `mappedTimes` 是子工作流的执行的index,从1开始
## 运行状态
Dynamic任务启动后,会根据输入参数列表,计算出所有的参数组合,然后对每一个参数组合,创建一个子工作流实例。
Dynamic运行时,会定期检测当前所有子工作流实例的统计信息,如果并行度大于运行中的子工作流实例的数量,则会触发启动合适数量的工作流实例(工作流实例是先创建,后续再出发启动)。
如下如所示。
![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
当且进度所有的子工作流实例全部运行成功时,dynamic任务才会运行成功。

BIN
docs/img/tasks/demo/dynamic_definition.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

BIN
docs/img/tasks/demo/dynamic_running.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

BIN
docs/img/tasks/icons/dynamic.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@ -319,6 +320,28 @@ public class ProcessInstanceController extends BaseController {
return returnDataList(result); return returnDataList(result);
} }
/**
* query dynamic sub process instance detail info by task id
*
* @param loginUser login user
* @param taskId task id
* @return sub process instance detail
*/
@Operation(summary = "queryDynamicSubWorkflowInstances", description = "QUERY_DYNAMIC_SUBPROCESS_INSTANCE_BY_TASK_CODE_NOTES")
@Parameters({
@Parameter(name = "taskId", description = "taskInstanceId", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@GetMapping(value = "/query-dynamic-sub-workflows")
@ResponseStatus(HttpStatus.OK)
@ApiException(Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<List<DynamicSubWorkflowDto>> queryDynamicSubWorkflowInstances(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("taskId") Integer taskId) {
List<DynamicSubWorkflowDto> dynamicSubWorkflowDtos =
processInstanceService.queryDynamicSubWorkflowInstances(loginUser, taskId);
return new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), dynamicSubWorkflowDtos);
}
/** /**
* query process instance global variables and local variables * query process instance global variables and local variables
* *

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class DynamicSubWorkflowDto {
private long processInstanceId;
private String name;
private long index;
private Map<String, String> parameters;
private WorkflowExecutionStatus state;
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -271,6 +271,8 @@ public enum Status {
NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"), NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"),
STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"), STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"),
TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic", "任务实例[{0}]不是Dynamic类型"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"), RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
@ -135,6 +136,9 @@ public interface ProcessInstanceService {
long projectCode, long projectCode,
Integer taskId); Integer taskId);
List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser,
Integer taskId);
/** /**
* update process instance * update process instance
* *

68
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
@ -44,6 +45,7 @@ import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@ -57,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@ -66,6 +69,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
@ -93,6 +97,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -184,6 +189,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired @Autowired
private ScheduleMapper scheduleMapper; private ScheduleMapper scheduleMapper;
@Autowired
private RelationSubWorkflowMapper relationSubWorkflowMapper;
@Autowired @Autowired
private AlertDao alertDao; private AlertDao alertDao;
@ -488,6 +496,66 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
@Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId);
Map<String, Object> result = new HashMap<>();
if (taskInstance == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
if (taskDefinition == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
}
if (!taskInstance.isDynamic()) {
putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, taskInstance.getName());
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
}
List<RelationSubWorkflow> relationSubWorkflows = relationSubWorkflowMapper
.queryAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()),
taskInstance.getTaskCode());
List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
List<ProcessInstance> allSubWorkflows = processInstanceDao.queryBatchIds(allSubProcessInstanceId);
if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
throw new ServiceException(Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
}
Long subWorkflowCode = allSubWorkflows.get(0).getProcessDefinitionCode();
int subWorkflowVersion = allSubWorkflows.get(0).getProcessDefinitionVersion();
ProcessDefinition subProcessDefinition =
processService.findProcessDefinition(subWorkflowCode, subWorkflowVersion);
if (subProcessDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode);
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode);
}
allSubWorkflows.sort(Comparator.comparing(ProcessInstance::getId));
List<DynamicSubWorkflowDto> allDynamicSubWorkflowDtos = new ArrayList<>();
int index = 1;
for (ProcessInstance processInstance : allSubWorkflows) {
DynamicSubWorkflowDto dynamicSubWorkflowDto = new DynamicSubWorkflowDto();
dynamicSubWorkflowDto.setProcessInstanceId(processInstance.getId());
dynamicSubWorkflowDto.setIndex(index);
dynamicSubWorkflowDto.setState(processInstance.getState());
dynamicSubWorkflowDto.setName(subProcessDefinition.getName());
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
String parameter = commandParamMap.get(CommandKeyConstants.CMD_DYNAMIC_START_PARAMS);
dynamicSubWorkflowDto.setParameters(JSONUtils.toMap(parameter));
allDynamicSubWorkflowDtos.add(dynamicSubWorkflowDto);
index++;
}
return allDynamicSubWorkflowDtos;
}
/** /**
* add dependent result for dependent task * add dependent result for dependent task
*/ */

1
dolphinscheduler-api/src/main/resources/task-type-config.yaml

@ -40,6 +40,7 @@ task:
- 'DEPENDENT' - 'DEPENDENT'
- 'CONDITIONS' - 'CONDITIONS'
- 'SWITCH' - 'SWITCH'
- 'DYNAMIC'
dataIntegration: dataIntegration:
- 'SEATUNNEL' - 'SEATUNNEL'
- 'DATAX' - 'DATAX'

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java

@ -45,6 +45,8 @@ public class CommandKeyConstants {
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams"; public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
public static final String CMD_DYNAMIC_START_PARAMS = "dynamicParams";
/** /**
* complement data start date * complement data start date
*/ */

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

@ -56,6 +56,7 @@ public enum CommandType {
RECOVER_WAITING_THREAD(10, "recover waiting thread"), RECOVER_WAITING_THREAD(10, "recover waiting thread"),
RECOVER_SERIAL_WAIT(11, "recover serial wait"), RECOVER_SERIAL_WAIT(11, "recover serial wait"),
EXECUTE_TASK(12, "start a task node in a process instance"), EXECUTE_TASK(12, "start a task node in a process instance"),
DYNAMIC_GENERATION(13, "dynamic generation"),
; ;
CommandType(int code, String descp) { CommandType(int code, String descp) {

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java

@ -40,6 +40,7 @@ public enum WorkflowExecutionStatus {
SERIAL_WAIT(14, "serial wait"), SERIAL_WAIT(14, "serial wait"),
READY_BLOCK(15, "ready block"), READY_BLOCK(15, "ready block"),
BLOCK(16, "block"), BLOCK(16, "block"),
WAIT_TO_RUN(17, "wait to run"),
; ;
private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new HashMap<>(); private static final Map<Integer, WorkflowExecutionStatus> CODE_MAP = new HashMap<>();

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java

@ -59,6 +59,7 @@ public class BusinessTimeUtils {
case RECOVER_SUSPENDED_PROCESS: case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING: case REPEAT_RUNNING:
case DYNAMIC_GENERATION:
case SCHEDULER: case SCHEDULER:
default: default:
businessDate = addDays(new Date(), -1); businessDate = addDays(new Date(), -1);

50
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import lombok.Data;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@TableName("t_ds_relation_sub_workflow")
public class RelationSubWorkflow {
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* parent process instance id
*/
private Long parentWorkflowInstanceId;
/**
* parent task instance id
*/
private Long parentTaskCode;
/**
* process instance id
*/
private Long subWorkflowInstanceId;
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SEC_2_MINUT
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DYNAMIC;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
@ -357,6 +358,10 @@ public class TaskInstance implements Serializable {
return TASK_TYPE_DEPENDENT.equalsIgnoreCase(this.taskType); return TASK_TYPE_DEPENDENT.equalsIgnoreCase(this.taskType);
} }
public boolean isDynamic() {
return TASK_TYPE_DYNAMIC.equalsIgnoreCase(this.taskType);
}
public boolean isConditionsTask() { public boolean isConditionsTask() {
return TASK_TYPE_CONDITIONS.equalsIgnoreCase(this.taskType); return TASK_TYPE_CONDITIONS.equalsIgnoreCase(this.taskType);
} }

40
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process instance map mapper interface
*/
public interface RelationSubWorkflowMapper extends BaseMapper<RelationSubWorkflow> {
int batchInsert(@Param("relationSubWorkflows") List<RelationSubWorkflow> relationSubWorkflows);
List<RelationSubWorkflow> queryAllSubProcessInstance(@Param("parentWorkflowInstanceId") Long parentWorkflowInstanceId,
@Param("parentTaskCode") Long parentTaskCode);
RelationSubWorkflow queryParentWorkflowInstance(@Param("subWorkflowInstanceId") Long subWorkflowInstanceId);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -38,6 +38,8 @@ public interface ProcessInstanceDao {
*/ */
public int upsertProcessInstance(ProcessInstance processInstance); public int upsertProcessInstance(ProcessInstance processInstance);
List<ProcessInstance> queryBatchIds(List<Long> processInstanceIds);
void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds); void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
void deleteById(Integer workflowInstanceId); void deleteById(Integer workflowInstanceId);

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -65,6 +66,14 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
} }
} }
@Override
public List<ProcessInstance> queryBatchIds(List<Long> processInstanceIds) {
if (CollectionUtils.isEmpty(processInstanceIds)) {
return new ArrayList<>();
}
return processInstanceMapper.selectBatchIds(processInstanceIds);
}
@Override @Override
public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) { public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) {
if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) { if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) {

44
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper">
<sql id="baseSql">
id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id
</sql>
<insert id="batchInsert">
insert into t_ds_relation_sub_workflow (parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id)
values
<foreach collection="relationSubWorkflows" item="relationSubWorkflow" separator=",">
(#{relationSubWorkflow.parentWorkflowInstanceId}, #{relationSubWorkflow.parentTaskCode}, #{relationSubWorkflow.subWorkflowInstanceId})
</foreach>
</insert>
<select id="queryAllSubProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow">
select
id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id
FROM t_ds_relation_sub_workflow
WHERE parent_workflow_instance_id = #{parentWorkflowInstanceId}
AND parent_task_code = #{parentTaskCode}
</select>
<select id="queryParentWorkflowInstance" resultType="org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow">
select
id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id
FROM t_ds_relation_sub_workflow
WHERE sub_workflow_instance_id = #{subWorkflowInstanceId}
</select>
</mapper>

13
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -2042,3 +2042,16 @@ CREATE TABLE t_ds_trigger_relation
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY t_ds_trigger_relation_UN(trigger_type,job_id,trigger_code) UNIQUE KEY t_ds_trigger_relation_UN(trigger_type,job_id,trigger_code)
); );
DROP TABLE IF EXISTS t_ds_relation_sub_workflow;
CREATE TABLE t_ds_relation_sub_workflow (
id BIGINT AUTO_INCREMENT NOT NULL,
parent_workflow_instance_id BIGINT NOT NULL,
parent_task_code BIGINT NOT NULL,
sub_workflow_instance_id BIGINT NOT NULL,
PRIMARY KEY (id),
INDEX idx_parent_workflow_instance_id (parent_workflow_instance_id),
INDEX idx_parent_task_code (parent_task_code),
INDEX idx_sub_workflow_instance_id (sub_workflow_instance_id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

13
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -2016,3 +2016,16 @@ CREATE TABLE `t_ds_trigger_relation` (
KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`), KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`),
UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`) UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
DROP TABLE IF EXISTS `t_ds_relation_sub_workflow`;
CREATE TABLE `t_ds_relation_sub_workflow` (
`id` bigint NOT NULL AUTO_INCREMENT,
`parent_workflow_instance_id` bigint NOT NULL,
`parent_task_code` bigint NOT NULL,
`sub_workflow_instance_id` bigint NOT NULL,
PRIMARY KEY (`id`),
KEY `idx_parent_workflow_instance_id` (`parent_workflow_instance_id`),
KEY `idx_parent_task_code` (`parent_task_code`),
KEY `idx_sub_workflow_instance_id` (`sub_workflow_instance_id`)
);

13
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -2001,3 +2001,16 @@ CREATE TABLE t_ds_trigger_relation (
PRIMARY KEY (id), PRIMARY KEY (id),
CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code) CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code)
); );
DROP TABLE IF EXISTS t_ds_relation_sub_workflow;
CREATE TABLE t_ds_relation_sub_workflow (
id serial NOT NULL,
parent_workflow_instance_id BIGINT NOT NULL,
parent_task_code BIGINT NOT NULL,
sub_workflow_instance_id BIGINT NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX idx_parent_workflow_instance_id ON t_ds_relation_sub_workflow (parent_workflow_instance_id);
CREATE INDEX idx_parent_task_code ON t_ds_relation_sub_workflow (parent_task_code);
CREATE INDEX idx_sub_workflow_instance_id ON t_ds_relation_sub_workflow (sub_workflow_instance_id);

24
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql

@ -412,3 +412,27 @@ d//
delimiter ; delimiter ;
CALL add_improvement_workflow_run_tenant; CALL add_improvement_workflow_run_tenant;
DROP PROCEDURE add_improvement_workflow_run_tenant; DROP PROCEDURE add_improvement_workflow_run_tenant;
-- uc_dolphin_T_t_ds_relation_sub_workflow
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_relation_sub_workflow;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_relation_sub_workflow()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_relation_sub_workflow'
AND TABLE_SCHEMA=(SELECT DATABASE()))
THEN
CREATE TABLE `t_ds_relation_sub_workflow` (
`id` bigint NOT NULL AUTO_INCREMENT,
`parent_workflow_instance_id` bigint NOT NULL,
`parent_task_code` bigint NOT NULL,
`sub_workflow_instance_id` bigint NOT NULL,
PRIMARY KEY (`id`),
KEY `idx_parent_workflow_instance_id` (`parent_workflow_instance_id`),
KEY `idx_parent_task_code` (`parent_task_code`),
KEY `idx_sub_workflow_instance_id` (`sub_workflow_instance_id`)
);
END IF;
END;
d//

24
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -315,3 +315,27 @@ d//
delimiter ; delimiter ;
select add_improvement_workflow_run_tenant(); select add_improvement_workflow_run_tenant();
DROP FUNCTION add_improvement_workflow_run_tenant(); DROP FUNCTION add_improvement_workflow_run_tenant();
-- uc_dolphin_T_t_ds_relation_sub_workflow
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_relation_sub_workflow()
RETURNS VOID AS $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name='t_ds_relation_sub_workflow'
AND table_schema=current_schema()
) THEN
CREATE TABLE t_ds_relation_sub_workflow (
id serial NOT NULL,
parent_workflow_instance_id BIGINT NOT NULL,
parent_task_code BIGINT NOT NULL,
sub_workflow_instance_id BIGINT NOT NULL,
PRIMARY KEY (id)
);
CREATE INDEX idx_parent_workflow_instance_id ON t_ds_relation_sub_workflow (parent_workflow_instance_id);
CREATE INDEX idx_parent_task_code ON t_ds_relation_sub_workflow (parent_task_code);
CREATE INDEX idx_sub_workflow_instance_id ON t_ds_relation_sub_workflow (sub_workflow_instance_id);
END IF;
END;
$$ LANGUAGE plpgsql;

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java

@ -37,7 +37,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
@Override @Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleError { StateEvent stateEvent) throws StateEventHandleError, StateEventHandleException {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent; TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
TaskMetrics.incTaskInstanceByState("timeout"); TaskMetrics.incTaskInstanceByState("timeout");
@ -62,6 +62,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) { || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
if (taskExecuteRunnableMap.containsKey(taskInstance.getTaskCode())) { if (taskExecuteRunnableMap.containsKey(taskInstance.getTaskCode())) {
taskExecuteRunnableMap.get(taskInstance.getTaskCode()).timeout(); taskExecuteRunnableMap.get(taskInstance.getTaskCode()).timeout();
workflowExecuteRunnable.taskFinished(taskInstance);
} else { } else {
log.warn( log.warn(
"cannot find the task processor for task {}, so skip task processor action.", "cannot find the task processor for task {}, so skip task processor action.",

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -445,8 +445,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (taskInstance.getState().isSuccess()) { if (taskInstance.getState().isSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance mergeTaskInstanceVarPool(taskInstance);
processInstance.setVarPool(taskInstance.getVarPool());
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
// save the cacheKey only if the task is defined as cache task and the task is success // save the cacheKey only if the task is defined as cache task and the task is success
if (taskInstance.getIsCache().equals(Flag.YES)) { if (taskInstance.getIsCache().equals(Flag.YES)) {
@ -2212,4 +2211,25 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
private boolean isExecutedOnMaster(String host) { private boolean isExecutedOnMaster(String host) {
return host.endsWith(masterAddress.split(Constants.COLON)[1]); return host.endsWith(masterAddress.split(Constants.COLON)[1]);
} }
private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
String taskVarPoolJson = taskInstance.getVarPool();
if (StringUtils.isEmpty(taskVarPoolJson)) {
return;
}
String processVarPoolJson = processInstance.getVarPool();
if (StringUtils.isEmpty(processVarPoolJson)) {
processInstance.setVarPool(taskVarPoolJson);
return;
}
List<Property> processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson, Property.class);
Set<String> newProcessVarPoolKeys = taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
processVarPool = processVarPool.stream().filter(property -> !newProcessVarPoolKeys.contains(property.getProp()))
.collect(Collectors.toList());
processVarPool.addAll(taskVarPool);
processInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
}
} }

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute; package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;
import java.util.Set; import java.util.Set;
@ -38,7 +39,8 @@ public class MasterTaskExecuteRunnableFactoryBuilder {
private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet( private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet(
DependentLogicTask.TASK_TYPE, DependentLogicTask.TASK_TYPE,
SubWorkflowLogicTask.TASK_TYPE); SubWorkflowLogicTask.TASK_TYPE,
DynamicLogicTask.TASK_TYPE);
public MasterDelayTaskExecuteRunnableFactory<? extends MasterDelayTaskExecuteRunnable> createWorkerDelayTaskExecuteRunnableFactory(String taskType) { public MasterDelayTaskExecuteRunnableFactory<? extends MasterDelayTaskExecuteRunnable> createWorkerDelayTaskExecuteRunnableFactory(String taskType) {
if (ASYNC_TASK_TYPE.contains(taskType)) { if (ASYNC_TASK_TYPE.contains(taskType)) {

178
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DynamicAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction {
private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
private static final String OUTPUT_KEY = "dynamic.out";
private final ProcessInstance processInstance;
private final TaskInstance taskInstance;
private final SubWorkflowService subWorkflowService;
private final CommandMapper commandMapper;
private final int degreeOfParallelism;
private final DynamicLogicTask logicTask;
public DynamicAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext,
ProcessInstance processInstance,
TaskInstance taskInstance,
DynamicLogicTask dynamicLogicTask,
CommandMapper commandMapper,
SubWorkflowService subWorkflowService,
int degreeOfParallelism) {
this.processInstance = processInstance;
this.taskInstance = taskInstance;
this.logicTask = dynamicLogicTask;
this.degreeOfParallelism = degreeOfParallelism;
this.commandMapper = commandMapper;
this.subWorkflowService = subWorkflowService;
}
@Override
public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() {
List<ProcessInstance> allSubProcessInstance = getAllSubProcessInstance();
int totalSubProcessInstanceCount = allSubProcessInstance.size();
List<ProcessInstance> finishedSubProcessInstance =
subWorkflowService.filterFinishProcessInstances(allSubProcessInstance);
if (finishedSubProcessInstance.size() == totalSubProcessInstanceCount) {
log.info("all sub process instance finish");
int successCount = subWorkflowService.filterSuccessProcessInstances(finishedSubProcessInstance).size();
log.info("success sub process instance count: {}", successCount);
if (successCount == totalSubProcessInstanceCount) {
log.info("all sub process instance success");
setOutputParameters();
return AsyncTaskExecutionStatus.SUCCESS;
} else {
int failedCount = totalSubProcessInstanceCount - successCount;
log.info("failed sub process instance count: {}", failedCount);
return AsyncTaskExecutionStatus.FAILED;
}
}
if (logicTask.isCancel()) {
return AsyncTaskExecutionStatus.FAILED;
}
int runningCount = subWorkflowService.filterRunningProcessInstances(allSubProcessInstance).size();
int startCount = degreeOfParallelism - runningCount;
if (startCount > 0) {
log.info("There are {} sub process instances that can be started", startCount);
startSubProcessInstances(allSubProcessInstance, startCount);
}
// query the status of sub workflow instance
return AsyncTaskExecutionStatus.RUNNING;
}
private void setOutputParameters() {
log.info("set varPool");
List<ProcessInstance> allSubProcessInstance = getAllSubProcessInstance();
List<DynamicOutput> dynamicOutputs = new ArrayList<>();
int index = 1;
for (ProcessInstance processInstance : allSubProcessInstance) {
DynamicOutput dynamicOutput = new DynamicOutput();
Map<String, String> dynamicParams =
JSONUtils.toMap(JSONUtils.toMap(processInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS));
dynamicOutput.setDynParams(dynamicParams);
Map<String, String> outputValueMap = new HashMap<>();
List<Property> propertyList = subWorkflowService.getWorkflowOutputParameters(processInstance);
for (Property property : propertyList) {
outputValueMap.put(property.getProp(), property.getValue());
}
dynamicOutput.setOutputValue(outputValueMap);
dynamicOutput.setMappedTimes(index++);
dynamicOutputs.add(dynamicOutput);
}
Property property = new Property();
property.setProp(String.format("%s(%s)", OUTPUT_KEY, taskInstance.getName()));
property.setDirect(Direct.OUT);
property.setType(DataType.VARCHAR);
property.setValue(JSONUtils.toJsonString(dynamicOutputs));
List<Property> taskPropertyList = new ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
taskPropertyList.add(property);
logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
log.info("set property: {}", property);
}
private void startSubProcessInstances(List<ProcessInstance> allSubProcessInstance, int startCount) {
List<ProcessInstance> waitingProcessInstances =
subWorkflowService.filterWaitToRunProcessInstances(allSubProcessInstance);
for (int i = 0; i < Math.min(startCount, waitingProcessInstances.size()); i++) {
ProcessInstance subProcessInstance = waitingProcessInstances.get(i);
Map<String, String> parameters = JSONUtils.toMap(DynamicCommandUtils
.getDataFromCommandParam(subProcessInstance.getCommandParam(), CMD_DYNAMIC_START_PARAMS));
Command command = DynamicCommandUtils.createCommand(this.processInstance,
subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(),
parameters);
command.setProcessInstanceId(subProcessInstance.getId());
commandMapper.insert(command);
log.info("start sub process instance, sub process instance id: {}, command: {}", subProcessInstance.getId(),
command);
}
}
public List<ProcessInstance> getAllSubProcessInstance() {
return subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
}
@Override
public @NonNull Duration getAsyncTaskStateCheckInterval() {
return TASK_EXECUTE_STATE_CHECK_INTERVAL;
}
}

85
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
public class DynamicCommandUtils {
static public Command createCommand(ProcessInstance processInstance,
Long subProcessDefinitionCode,
Integer subProcessDefinitionVersion,
Map<String, String> parameters) {
Command command = new Command();
if (processInstance.getCommandType().equals(CommandType.START_PROCESS)) {
command.setCommandType(CommandType.DYNAMIC_GENERATION);
} else {
command.setCommandType(processInstance.getCommandType());
}
command.setProcessDefinitionCode(subProcessDefinitionCode);
command.setProcessDefinitionVersion(subProcessDefinitionVersion);
command.setTaskDependType(TaskDependType.TASK_POST);
command.setFailureStrategy(processInstance.getFailureStrategy());
command.setWarningType(processInstance.getWarningType());
String globalParams = processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
List<Property> parentParams = Lists.newArrayList(JSONUtils.toList(globalParams, Property.class));
for (Property parentParam : parentParams) {
parameters.put(parentParam.getProp(), parentParam.getValue());
}
}
addDataToCommandParam(command, CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters));
command.setExecutorId(processInstance.getExecutorId());
command.setWarningGroupId(processInstance.getWarningGroupId());
command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
command.setWorkerGroup(processInstance.getWorkerGroup());
command.setDryRun(processInstance.getDryRun());
return command;
}
static public String getDataFromCommandParam(String commandParam, String key) {
Map<String, String> cmdParam = JSONUtils.toMap(commandParam);
return cmdParam.get(key);
}
static void addDataToCommandParam(Command command, String key, String data) {
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
if (cmdParam == null) {
cmdParam = new HashMap<>();
}
cmdParam.put(key, data);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
}

309
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java

@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
@Slf4j
public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
public static final String TASK_TYPE = "DYNAMIC";
private final ProcessInstanceDao processInstanceDao;
private final SubWorkflowService subWorkflowService;
private final ProcessDefinitionMapper processDefineMapper;
private final CommandMapper commandMapper;
private final ProcessService processService;
private ProcessInstance processInstance;
private TaskInstance taskInstance;
private final MasterRpcClient masterRpcClient;
private boolean haveBeenCanceled = false;
public DynamicLogicTask(TaskExecutionContext taskExecutionContext,
ProcessInstanceDao processInstanceDao,
TaskInstanceDao taskInstanceDao,
SubWorkflowService subWorkflowService,
ProcessService processService,
MasterRpcClient masterRpcClient,
ProcessDefinitionMapper processDefineMapper,
CommandMapper commandMapper) {
super(taskExecutionContext,
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference<DynamicParameters>() {
}));
this.processInstanceDao = processInstanceDao;
this.subWorkflowService = subWorkflowService;
this.processService = processService;
this.masterRpcClient = masterRpcClient;
this.processDefineMapper = processDefineMapper;
this.commandMapper = commandMapper;
this.processInstance =
processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId());
this.taskInstance = taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
}
@Override
public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
List<Map<String, String>> parameterGroup = generateParameterGroup();
if (parameterGroup.size() > taskParameters.getMaxNumOfSubWorkflowInstances()) {
log.warn("the number of sub process instances [{}] exceeds the maximum limit [{}]", parameterGroup.size(),
taskParameters.getMaxNumOfSubWorkflowInstances());
parameterGroup = parameterGroup.subList(0, taskParameters.getMaxNumOfSubWorkflowInstances());
}
// if already exists sub process instance, do not generate again
List<ProcessInstance> existsSubProcessInstanceList =
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
generateSubWorkflowInstance(parameterGroup);
} else {
resetProcessInstanceStatus(existsSubProcessInstanceList);
}
return new DynamicAsyncTaskExecuteFunction(taskExecutionContext, processInstance, taskInstance, this,
commandMapper,
subWorkflowService, taskParameters.getDegreeOfParallelism());
}
public void resetProcessInstanceStatus(List<ProcessInstance> existsSubProcessInstanceList) {
switch (processInstance.getCommandType()) {
case REPEAT_RUNNING:
existsSubProcessInstanceList.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.updateProcessInstance(processInstance);
});
break;
case START_FAILURE_TASK_PROCESS:
case RECOVER_TOLERANCE_FAULT_PROCESS:
List<ProcessInstance> failedProcessInstances =
subWorkflowService.filterFailedProcessInstances(existsSubProcessInstanceList);
failedProcessInstances.forEach(processInstance -> {
processInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.updateProcessInstance(processInstance);
});
break;
}
}
public void generateSubWorkflowInstance(List<Map<String, String>> parameterGroup) throws MasterTaskExecuteException {
List<ProcessInstance> processInstanceList = new ArrayList<>();
ProcessDefinition subProcessDefinition =
processDefineMapper.queryByCode(taskParameters.getProcessDefinitionCode());
for (Map<String, String> parameters : parameterGroup) {
String dynamicStartParams = JSONUtils.toJsonString(parameters);
Command command = DynamicCommandUtils.createCommand(processInstance, subProcessDefinition.getCode(),
subProcessDefinition.getVersion(), parameters);
// todo: set id to -1? we use command to generate sub process instance, but the generate method will use the
// command id to do
// somethings
command.setId(-1);
DynamicCommandUtils.addDataToCommandParam(command, CommandKeyConstants.CMD_DYNAMIC_START_PARAMS,
dynamicStartParams);
ProcessInstance subProcessInstance = createSubProcessInstance(command);
subProcessInstance.setState(WorkflowExecutionStatus.WAIT_TO_RUN);
processInstanceDao.insertProcessInstance(subProcessInstance);
command.setProcessInstanceId(subProcessInstance.getId());
processInstanceList.add(subProcessInstance);
}
List<RelationSubWorkflow> relationSubWorkflowList = new ArrayList<>();
for (ProcessInstance subProcessInstance : processInstanceList) {
RelationSubWorkflow relationSubWorkflow = new RelationSubWorkflow();
relationSubWorkflow.setParentWorkflowInstanceId(Long.valueOf(processInstance.getId()));
relationSubWorkflow.setParentTaskCode(taskInstance.getTaskCode());
relationSubWorkflow.setSubWorkflowInstanceId(Long.valueOf(subProcessInstance.getId()));
relationSubWorkflowList.add(relationSubWorkflow);
}
log.info("Expected number of runs : {}, actual number of runs : {}", parameterGroup.size(),
processInstanceList.size());
int insertN = subWorkflowService.batchInsertRelationSubWorkflow(relationSubWorkflowList);
log.info("insert {} relation sub workflow", insertN);
}
public ProcessInstance createSubProcessInstance(Command command) throws MasterTaskExecuteException {
ProcessInstance subProcessInstance;
try {
subProcessInstance = processService.constructProcessInstance(command, processInstance.getHost());
subProcessInstance.setIsSubProcess(Flag.YES);
subProcessInstance.setVarPool(taskExecutionContext.getVarPool());
} catch (Exception e) {
log.error("create sub process instance error", e);
throw new MasterTaskExecuteException(e.getMessage());
}
return subProcessInstance;
}
public List<Map<String, String>> generateParameterGroup() {
List<DynamicInputParameter> dynamicInputParameters = getDynamicInputParameters();
Set<String> filterStrings =
Arrays.stream(StringUtils.split(taskParameters.getFilterCondition(), ",")).map(String::trim)
.collect(Collectors.toSet());
List<List<DynamicInputParameter>> allParameters = new ArrayList<>();
for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
List<DynamicInputParameter> singleParameters = new ArrayList<>();
String value = dynamicInputParameter.getValue();
String separator = dynamicInputParameter.getSeparator();
List<String> valueList =
Arrays.stream(StringUtils.split(value, separator)).map(String::trim).collect(Collectors.toList());
valueList = valueList.stream().filter(v -> !filterStrings.contains(v)).collect(Collectors.toList());
for (String v : valueList) {
DynamicInputParameter singleParameter = new DynamicInputParameter();
singleParameter.setName(dynamicInputParameter.getName());
singleParameter.setValue(v);
singleParameters.add(singleParameter);
}
allParameters.add(singleParameters);
}
// use Sets.cartesianProduct to get the cartesian product of all parameters
List<List<DynamicInputParameter>> cartesianProduct = Lists.cartesianProduct(allParameters);
// convert cartesian product to parameter group List<Map<name:value>>
List<Map<String, String>> parameterGroup = cartesianProduct.stream().map(
inputParameterList -> inputParameterList.stream().collect(
Collectors.toMap(DynamicInputParameter::getName, DynamicInputParameter::getValue)))
.collect(Collectors.toList());
log.info("parameter group size: {}", parameterGroup.size());
// log every parameter group
if (CollectionUtils.isNotEmpty(parameterGroup)) {
for (Map<String, String> map : parameterGroup) {
log.info("parameter group: {}", map);
}
}
return parameterGroup;
}
private List<DynamicInputParameter> getDynamicInputParameters() {
List<DynamicInputParameter> dynamicInputParameters = taskParameters.getListParameters();
if (CollectionUtils.isNotEmpty(dynamicInputParameters)) {
for (DynamicInputParameter dynamicInputParameter : dynamicInputParameters) {
String value = dynamicInputParameter.getValue();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
value = ParameterUtils.convertParameterPlaceholders(value, ParamUtils.convert(paramsMap));
dynamicInputParameter.setValue(value);
}
}
return dynamicInputParameters;
}
@Override
public void kill() {
try {
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
} catch (MasterTaskExecuteException e) {
log.error("kill {} error", taskInstance.getName(), e);
}
}
private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException {
this.haveBeenCanceled = true;
List<ProcessInstance> existsSubProcessInstanceList =
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
List<ProcessInstance> runningSubProcessInstanceList =
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) {
subProcessInstance.setState(stopStatus);
processInstanceDao.updateProcessInstance(subProcessInstance);
if (subProcessInstance.getState().isFinished()) {
log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId());
return;
}
try {
sendToSubProcess(taskExecutionContext, subProcessInstance);
log.info("Success send [{}] request to SubWorkflow's master: {}", stopStatus,
subProcessInstance.getHost());
} catch (RemotingException e) {
throw new MasterTaskExecuteException(
String.format("Send stop request to SubWorkflow's master: %s failed",
subProcessInstance.getHost()),
e);
}
}
}
private void sendToSubProcess(TaskExecutionContext taskExecutionContext,
ProcessInstance subProcessInstance) throws RemotingException {
WorkflowStateEventChangeRequest stateEventChangeCommand = new WorkflowStateEventChangeRequest(
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId(),
subProcessInstance.getState(),
subProcessInstance.getId(),
0);
Host host = new Host(subProcessInstance.getHost());
masterRpcClient.send(host, stateEventChangeCommand.convert2Command());
}
public boolean isCancel() {
return haveBeenCanceled;
}
}

72
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskPluginFactory.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.server.master.runner.task.ILogicTaskPluginFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DynamicLogicTaskPluginFactory implements ILogicTaskPluginFactory<DynamicLogicTask> {
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@Autowired
private CommandMapper commandMapper;
@Autowired
private ProcessService processService;
@Autowired
SubWorkflowService subWorkflowService;
@Autowired
private MasterRpcClient masterRpcClient;
@Override
public DynamicLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) {
return new DynamicLogicTask(taskExecutionContext, processInstanceDao, taskInstanceDao, subWorkflowService,
processService,
masterRpcClient, processDefineMapper, commandMapper);
}
@Override
public String getTaskType() {
return DynamicLogicTask.TASK_TYPE;
}
}

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicOutput.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import java.util.Map;
import lombok.Data;
@Data
public class DynamicOutput {
private Map<String, String> dynParams;
private Map<String, String> outputValue;
private int mappedTimes;
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java

@ -123,8 +123,8 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask<SubProcessParameter
log.info("TaskInstance is null"); log.info("TaskInstance is null");
return; return;
} }
if (taskInstance.getState().isFinished()) { if (subProcessInstance.getState().isFinished()) {
log.info("The task instance is finished, no need to pause"); log.info("The subProcessInstance is finished, no need to pause");
return; return;
} }
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task"); subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task");

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskUtils.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask; import org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask;
@ -38,7 +39,8 @@ public class TaskUtils {
ConditionLogicTask.TASK_TYPE, ConditionLogicTask.TASK_TYPE,
DependentLogicTask.TASK_TYPE, DependentLogicTask.TASK_TYPE,
SubWorkflowLogicTask.TASK_TYPE, SubWorkflowLogicTask.TASK_TYPE,
SwitchLogicTask.TASK_TYPE); SwitchLogicTask.TASK_TYPE,
DynamicLogicTask.TASK_TYPE);
public boolean isMasterTask(String taskType) { public boolean isMasterTask(String taskType) {
return MASTER_TASK_TYPES.contains(taskType); return MASTER_TASK_TYPES.contains(taskType);

141
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunctionTest.java

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import static org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction.AsyncTaskExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class DynamicAsyncTaskExecuteFunctionTest {
@Mock
private ProcessInstance processInstance;
@Mock
private TaskInstance taskInstance;
@Mock
private SubWorkflowService subWorkflowService;
@Mock
private CommandMapper commandMapper;
@Mock
private DynamicLogicTask dynamicLogicTask;
private DynamicAsyncTaskExecuteFunction function;
@BeforeEach
void setUp() {
processInstance = new ProcessInstance();
taskInstance = new TaskInstance();
processInstance.setId(1);
taskInstance.setTaskCode(2L);
function = new DynamicAsyncTaskExecuteFunction(
null,
processInstance,
taskInstance,
dynamicLogicTask,
commandMapper,
subWorkflowService,
0);
}
@Test
void shouldReturnSuccessWhenAllSubProcessInstancesFinishedSuccessfully() {
// Given
List<ProcessInstance> processInstances = Arrays.asList(Mockito.mock(ProcessInstance.class));
Mockito.when(processInstances.get(0).getCommandParam()).thenReturn("{}");
Mockito.when(subWorkflowService.getAllDynamicSubWorkflow(1, 2L)).thenReturn(processInstances);
Mockito.when(subWorkflowService.filterFinishProcessInstances(Mockito.any())).thenReturn(processInstances);
Mockito.when(subWorkflowService.filterSuccessProcessInstances(Mockito.any())).thenReturn(processInstances);
// When
DynamicParameters dynamicParameters = new DynamicParameters();
Mockito.when(dynamicLogicTask.getTaskParameters()).thenReturn(dynamicParameters);
AsyncTaskExecutionStatus status = function.getAsyncTaskExecutionStatus();
// Then
Assertions.assertEquals(AsyncTaskExecutionStatus.SUCCESS, status);
}
@Test
void shouldReturnFailedWhenSomeSubProcessInstancesFinishedUnsuccessfully() {
// Given
List<ProcessInstance> processInstances =
Arrays.asList(Mockito.mock(ProcessInstance.class), Mockito.mock(ProcessInstance.class));
Mockito.when(subWorkflowService.getAllDynamicSubWorkflow(1, 2L)).thenReturn(processInstances);
Mockito.when(subWorkflowService.filterFinishProcessInstances(Mockito.anyList())).thenReturn(processInstances);
Mockito.when(subWorkflowService.filterSuccessProcessInstances(Mockito.anyList()))
.thenReturn(Arrays.asList(processInstances.get(0)));
// When
AsyncTaskExecutionStatus status = function.getAsyncTaskExecutionStatus();
// Then
Assertions.assertEquals(AsyncTaskExecutionStatus.FAILED, status);
}
@Test
void shouldReturnRunningWhenSomeSubProcessInstancesAreRunning() {
// Given
List<ProcessInstance> processInstances = Arrays.asList(Mockito.mock(ProcessInstance.class));
Mockito.when(subWorkflowService.getAllDynamicSubWorkflow(1, 2L)).thenReturn(processInstances);
Mockito.when(subWorkflowService.filterFinishProcessInstances(Mockito.anyList())).thenReturn(Arrays.asList());
// When
AsyncTaskExecutionStatus status = function.getAsyncTaskExecutionStatus();
// Then
Assertions.assertEquals(AsyncTaskExecutionStatus.RUNNING, status);
}
@Test
void shouldReturnFailedWhenLogicTaskIsCancelled() {
// Given
List<ProcessInstance> processInstances = Arrays.asList(Mockito.mock(ProcessInstance.class));
Mockito.when(subWorkflowService.getAllDynamicSubWorkflow(1, 2L)).thenReturn(processInstances);
Mockito.when(dynamicLogicTask.isCancel()).thenReturn(true);
// When
AsyncTaskExecutionStatus status = function.getAsyncTaskExecutionStatus();
// Then
Assertions.assertEquals(AsyncTaskExecutionStatus.FAILED, status);
}
}

133
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtilsTest.java

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class DynamicCommandUtilsTest {
private ProcessInstance processInstance;
private Long subProcessDefinitionCode;
private Integer subProcessDefinitionVersion;
private Map<String, String> parameters;
@BeforeEach
void setUp() {
processInstance = new ProcessInstance();
subProcessDefinitionCode = 1L;
subProcessDefinitionVersion = 1;
parameters = new HashMap<>();
// Populate processInstance with some dummy data
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setFailureStrategy(null); // update this
processInstance.setWarningType(null); // update this
processInstance.setGlobalParams("{\"prop\":\"value\"}");
processInstance.setExecutorId(1);
processInstance.setWarningGroupId(1);
processInstance.setProcessInstancePriority(null); // update this
processInstance.setWorkerGroup("worker");
processInstance.setDryRun(0);
}
@Test
void testCreateCommand() {
Command command = DynamicCommandUtils.createCommand(processInstance, subProcessDefinitionCode,
subProcessDefinitionVersion, parameters);
Assertions.assertEquals(CommandType.DYNAMIC_GENERATION, command.getCommandType());
Assertions.assertEquals(subProcessDefinitionCode, command.getProcessDefinitionCode());
Assertions.assertEquals(subProcessDefinitionVersion, command.getProcessDefinitionVersion());
Assertions.assertEquals(TaskDependType.TASK_POST, command.getTaskDependType());
Assertions.assertEquals(processInstance.getFailureStrategy(), command.getFailureStrategy());
Assertions.assertEquals(processInstance.getWarningType(), command.getWarningType());
Assertions.assertEquals(processInstance.getExecutorId(), command.getExecutorId());
Assertions.assertEquals(processInstance.getWarningGroupId(), command.getWarningGroupId());
Assertions.assertEquals(processInstance.getProcessInstancePriority(), command.getProcessInstancePriority());
Assertions.assertEquals(processInstance.getWorkerGroup(), command.getWorkerGroup());
Assertions.assertEquals(processInstance.getDryRun(), command.getDryRun());
}
@Test
void testGetDataFromCommandParam() {
Command command = new Command();
DynamicCommandUtils.addDataToCommandParam(command, "testKey", "testData");
String data = DynamicCommandUtils.getDataFromCommandParam(command.getCommandParam(), "testKey");
Assertions.assertEquals("testData", data);
}
@Test
void testCreateCommandCommandType() {
// Scenario 1: CommandType is START_PROCESS
processInstance.setCommandType(CommandType.START_PROCESS);
Command command1 = DynamicCommandUtils.createCommand(processInstance, subProcessDefinitionCode,
subProcessDefinitionVersion, parameters);
Assertions.assertEquals(CommandType.DYNAMIC_GENERATION, command1.getCommandType());
// Scenario 2: CommandType is not START_PROCESS
processInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Command command2 = DynamicCommandUtils.createCommand(processInstance, subProcessDefinitionCode,
subProcessDefinitionVersion, parameters);
Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command2.getCommandType());
}
@Test
void testCreateCommandStartParams() {
// Scenario: Add some data to parameters
parameters.put("testKey", "testValue");
Command command = DynamicCommandUtils.createCommand(processInstance, subProcessDefinitionCode,
subProcessDefinitionVersion, parameters);
String startParamsJson = DynamicCommandUtils.getDataFromCommandParam(command.getCommandParam(),
CommandKeyConstants.CMD_PARAM_START_PARAMS);
Map<String, String> startParams = JSONUtils.toMap(startParamsJson);
Assertions.assertEquals("testValue", startParams.get("testKey"));
}
@Test
void testCreateCommandGlobalParams() {
// Scenario: processInstance has globalParams
parameters.put("testKey", "testValue");
processInstance.setGlobalParams("[{\"prop\":\"globalKey\",\"value\":\"globalValue\"}]");
Command command = DynamicCommandUtils.createCommand(processInstance, subProcessDefinitionCode,
subProcessDefinitionVersion, parameters);
String startParamsJson = DynamicCommandUtils.getDataFromCommandParam(command.getCommandParam(),
CommandKeyConstants.CMD_PARAM_START_PARAMS);
Map<String, String> startParams = JSONUtils.toMap(startParamsJson);
Assertions.assertEquals("testValue", startParams.get("testKey"));
Assertions.assertEquals("globalValue", startParams.get("globalKey"));
}
}

185
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTaskTest.java

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
@ExtendWith(MockitoExtension.class)
class DynamicLogicTaskTest {
@Mock
private ProcessInstanceDao processInstanceDao;
@Mock
private TaskInstanceDao taskInstanceDao;
@Mock
private SubWorkflowService subWorkflowService;
@Mock
private ProcessService processService;
@Mock
private MasterRpcClient masterRpcClient;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private CommandMapper commandMapper;
private DynamicParameters dynamicParameters;
private ProcessInstance processInstance;
private TaskExecutionContext taskExecutionContext;
private DynamicLogicTask dynamicLogicTask;
private ObjectMapper objectMapper;
@BeforeEach
public void setUp() {
// Set up your test environment before each test.
dynamicParameters = new DynamicParameters();
taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
objectMapper = new ObjectMapper();
processInstance = new ProcessInstance();
Mockito.when(processInstanceDao.queryByWorkflowInstanceId(Mockito.any())).thenReturn(processInstance);
dynamicLogicTask = new DynamicLogicTask(
taskExecutionContext,
processInstanceDao,
taskInstanceDao,
subWorkflowService,
processService,
masterRpcClient,
processDefineMapper,
commandMapper);
}
@Test
void testGenerateParameterGroup() throws Exception {
DynamicInputParameter dynamicInputParameter1 = new DynamicInputParameter();
dynamicInputParameter1.setName("param1");
dynamicInputParameter1.setValue("a,b,c");
dynamicInputParameter1.setSeparator(",");
DynamicInputParameter dynamicInputParameter2 = new DynamicInputParameter();
dynamicInputParameter2.setName("param2");
dynamicInputParameter2.setValue("1. 2 . 3");
dynamicInputParameter2.setSeparator(".");
List<DynamicInputParameter> dynamicInputParameters =
Arrays.asList(dynamicInputParameter1, dynamicInputParameter2);
dynamicParameters.setListParameters(dynamicInputParameters);
dynamicParameters.setFilterCondition("b,2");
Mockito.when(taskExecutionContext.getPrepareParamsMap()).thenReturn(new HashMap<>());
Mockito.when(taskExecutionContext.getTaskParams())
.thenReturn(objectMapper.writeValueAsString(dynamicParameters));
dynamicLogicTask = new DynamicLogicTask(
taskExecutionContext,
processInstanceDao,
taskInstanceDao,
subWorkflowService,
processService,
masterRpcClient,
processDefineMapper,
commandMapper);
List<Map<String, String>> parameterGroup = dynamicLogicTask.generateParameterGroup();
Assertions.assertEquals(4, parameterGroup.size()); // expected cartesian product without filtered values is 6
// Assert the value of parameter groups. Adjust these according to your expectations.
// Here we only check for a few representative cases to keep the test concise.
Map<String, String> expectedMap1 = new HashMap<>();
expectedMap1.put("param1", "a");
expectedMap1.put("param2", "1");
Map<String, String> expectedMap2 = new HashMap<>();
expectedMap2.put("param1", "a");
expectedMap2.put("param2", "3");
Map<String, String> expectedMap3 = new HashMap<>();
expectedMap3.put("param1", "c");
expectedMap3.put("param2", "1");
Map<String, String> expectedMap4 = new HashMap<>();
expectedMap4.put("param1", "c");
expectedMap4.put("param2", "3");
assert (parameterGroup.containsAll(Arrays.asList(expectedMap1, expectedMap2, expectedMap3, expectedMap4)));
}
@Test
void testResetProcessInstanceStatus_RepeatRunning() {
processInstance.setCommandType(CommandType.REPEAT_RUNNING);
ProcessInstance subProcessInstance = new ProcessInstance();
List<ProcessInstance> subProcessInstances = Arrays.asList(subProcessInstance);
dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances);
Mockito.verify(processInstanceDao).updateProcessInstance(subProcessInstance);
Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, subProcessInstance.getState());
}
@Test
void testResetProcessInstanceStatus_StartFailureTaskProcess() {
processInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
ProcessInstance failedSubProcessInstance = new ProcessInstance();
failedSubProcessInstance.setState(WorkflowExecutionStatus.FAILURE);
List<ProcessInstance> subProcessInstances = Arrays.asList(failedSubProcessInstance);
Mockito.when(subWorkflowService.filterFailedProcessInstances(subProcessInstances))
.thenReturn(Arrays.asList(failedSubProcessInstance));
dynamicLogicTask.resetProcessInstanceStatus(subProcessInstances);
Mockito.verify(processInstanceDao).updateProcessInstance(failedSubProcessInstance);
Assertions.assertEquals(WorkflowExecutionStatus.WAIT_TO_RUN, failedSubProcessInstance.getState());
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -64,6 +64,9 @@ public interface ProcessService {
ProcessInstance handleCommand(String host, ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException; Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
ProcessInstance constructProcessInstance(Command command,
String host) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
Optional<ProcessInstance> findProcessInstanceDetailById(int processId); Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
ProcessInstance findProcessInstanceById(int processId); ProcessInstance findProcessInstanceById(int processId);

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -744,8 +744,8 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host * @param host host
* @return process instance * @return process instance
*/ */
protected @Nullable ProcessInstance constructProcessInstance(Command command, public @Nullable ProcessInstance constructProcessInstance(Command command,
String host) throws CronParseException, CodeGenerateException { String host) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance; ProcessInstance processInstance;
ProcessDefinition processDefinition; ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType(); CommandType commandType = command.getCommandType();
@ -765,6 +765,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else { } else {
processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null); processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
setGlobalParamIfCommanded(processDefinition, cmdParam);
if (processInstance == null) { if (processInstance == null) {
return null; return null;
} }
@ -816,6 +817,7 @@ public class ProcessServiceImpl implements ProcessService {
int runTime = processInstance.getRunTimes(); int runTime = processInstance.getRunTimes();
switch (commandType) { switch (commandType) {
case START_PROCESS: case START_PROCESS:
case DYNAMIC_GENERATION:
break; break;
case START_FAILURE_TASK_PROCESS: case START_FAILURE_TASK_PROCESS:
// find failed tasks and init these tasks // find failed tasks and init these tasks

46
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowService.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.subworkflow;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import java.util.List;
import org.springframework.stereotype.Component;
@Component
public interface SubWorkflowService {
List<ProcessInstance> getAllDynamicSubWorkflow(long processInstanceId, long taskCode);
int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> relationSubWorkflowList);
List<ProcessInstance> filterFinishProcessInstances(List<ProcessInstance> processInstanceList);
List<ProcessInstance> filterSuccessProcessInstances(List<ProcessInstance> processInstanceList);
List<ProcessInstance> filterRunningProcessInstances(List<ProcessInstance> processInstanceList);
List<ProcessInstance> filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList);
List<ProcessInstance> filterFailedProcessInstances(List<ProcessInstance> processInstanceList);
List<Property> getWorkflowOutputParameters(ProcessInstance processInstance);
}

119
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/subworkflow/SubWorkflowServiceImpl.java

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.subworkflow;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SubWorkflowServiceImpl implements SubWorkflowService {
@Autowired
private RelationSubWorkflowMapper relationSubWorkflowMapper;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public List<ProcessInstance> getAllDynamicSubWorkflow(long processInstanceId, long taskCode) {
List<RelationSubWorkflow> relationSubWorkflows =
relationSubWorkflowMapper.queryAllSubProcessInstance(processInstanceId, taskCode);
List<Long> allSubProcessInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
List<ProcessInstance> allSubProcessInstance = processInstanceDao.queryBatchIds(allSubProcessInstanceId);
allSubProcessInstance.sort(Comparator.comparing(ProcessInstance::getId));
return allSubProcessInstance;
}
@Override
public int batchInsertRelationSubWorkflow(List<RelationSubWorkflow> relationSubWorkflowList) {
int insertN = relationSubWorkflowMapper.batchInsert(relationSubWorkflowList);
return insertN;
}
@Override
public List<ProcessInstance> filterFinishProcessInstances(List<ProcessInstance> processInstanceList) {
return processInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().isFinished()).collect(Collectors.toList());
}
@Override
public List<ProcessInstance> filterSuccessProcessInstances(List<ProcessInstance> processInstanceList) {
return processInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().isSuccess()).collect(Collectors.toList());
}
@Override
public List<ProcessInstance> filterRunningProcessInstances(List<ProcessInstance> processInstanceList) {
return processInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().isRunning()).collect(Collectors.toList());
}
@Override
public List<ProcessInstance> filterWaitToRunProcessInstances(List<ProcessInstance> processInstanceList) {
return processInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().equals(WorkflowExecutionStatus.WAIT_TO_RUN))
.collect(Collectors.toList());
}
@Override
public List<ProcessInstance> filterFailedProcessInstances(List<ProcessInstance> processInstanceList) {
return processInstanceList.stream()
.filter(subProcessInstance -> subProcessInstance.getState().isFailure()).collect(Collectors.toList());
}
@Override
public List<Property> getWorkflowOutputParameters(ProcessInstance processInstance) {
List<Property> outputParamList =
new ArrayList<>(JSONUtils.toList(processInstance.getVarPool(), Property.class));
ProcessDefinitionLog processDefinition = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
List<Property> globalParamList = JSONUtils.toList(processDefinition.getGlobalParams(), Property.class);
Set<String> ouputParamSet = outputParamList.stream().map(Property::getProp).collect(Collectors.toSet());
// add output global parameters which are not in output parameters list
globalParamList.stream().filter(globalParam -> !ouputParamSet.contains(globalParam.getProp()))
.forEach(outputParamList::add);
return outputParamList;
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -436,6 +436,8 @@ public class TaskConstants {
public static final String TASK_TYPE_SUB_PROCESS = "SUB_PROCESS"; public static final String TASK_TYPE_SUB_PROCESS = "SUB_PROCESS";
public static final String TASK_TYPE_DYNAMIC = "DYNAMIC";
public static final String TASK_TYPE_DEPENDENT = "DEPENDENT"; public static final String TASK_TYPE_DEPENDENT = "DEPENDENT";
public static final String TASK_TYPE_SQL = "SQL"; public static final String TASK_TYPE_SQL = "SQL";

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@ -102,6 +103,8 @@ public class TaskPluginManager {
return JSONUtils.parseObject(parametersNode.getTaskParams(), DependentParameters.class); return JSONUtils.parseObject(parametersNode.getTaskParams(), DependentParameters.class);
case TaskConstants.TASK_TYPE_BLOCKING: case TaskConstants.TASK_TYPE_BLOCKING:
return JSONUtils.parseObject(parametersNode.getTaskParams(), BlockingParameters.class); return JSONUtils.parseObject(parametersNode.getTaskParams(), BlockingParameters.class);
case TaskConstants.TASK_TYPE_DYNAMIC:
return JSONUtils.parseObject(parametersNode.getTaskParams(), DynamicParameters.class);
default: default:
TaskChannel taskChannel = this.getTaskChannelMap().get(taskType); TaskChannel taskChannel = this.getTaskChannelMap().get(taskType);
if (Objects.isNull(taskChannel)) { if (Objects.isNull(taskChannel)) {

33
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DynamicInputParameter.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
@Data
@NoArgsConstructor
public class DynamicInputParameter {
@NonNull
private String name;
@NonNull
private String value;
private String separator = ",";
}

53
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DynamicParameters.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.parameters;
import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
import java.util.List;
import lombok.Data;
@Data
public class DynamicParameters extends AbstractParameters {
/**
* process definition id
*/
private long processDefinitionCode;
private int maxNumOfSubWorkflowInstances;
private int degreeOfParallelism;
private String filterCondition;
private List<DynamicInputParameter> listParameters;
@Override
public boolean checkParameters() {
try {
if (listParameters == null || listParameters.isEmpty()) {
return false;
}
} catch (Exception e) {
return false;
}
return this.processDefinitionCode != 0;
}
}

BIN
dolphinscheduler-ui/public/images/task-icons/dynamic.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/dynamic_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

17
dolphinscheduler-ui/src/common/common.ts

@ -30,7 +30,8 @@ import {
StopOutlined, StopOutlined,
IssuesCloseOutlined, IssuesCloseOutlined,
SendOutlined, SendOutlined,
HistoryOutlined HistoryOutlined,
HourglassOutlined
} from '@vicons/antd' } from '@vicons/antd'
import { format, parseISO } from 'date-fns' import { format, parseISO } from 'date-fns'
import _ from 'lodash' import _ from 'lodash'
@ -123,6 +124,10 @@ export const runningType = (t: any) => [
{ {
desc: `${t('project.workflow.execute_task')}`, desc: `${t('project.workflow.execute_task')}`,
code: 'EXECUTE_TASK' code: 'EXECUTE_TASK'
},
{
desc: `${t('project.workflow.dynamic_generation')}`,
code: 'DYNAMIC_GENERATION'
} }
] ]
@ -379,7 +384,15 @@ export const workflowExecutionState = (
icon: HistoryOutlined, icon: HistoryOutlined,
isSpin: false, isSpin: false,
classNames: 'pending' classNames: 'pending'
} },
WAIT_TO_RUN: {
id: 18,
desc: `${t('project.overview.wait_to_run')}`,
color: '#5102ce',
icon: HourglassOutlined,
isSpin: false,
classNames: 'wait_to_run'
},
}) })
/** /**

1
dolphinscheduler-ui/src/common/types.ts

@ -41,6 +41,7 @@ export type IWorkflowExecutionState =
| 'SERIAL_WAIT' | 'SERIAL_WAIT'
| 'READY_BLOCK' | 'READY_BLOCK'
| 'BLOCK' | 'BLOCK'
| 'WAIT_TO_RUN'
export type ITaskStateConfig = { export type ITaskStateConfig = {
[key in ITaskState]: { [key in ITaskState]: {

12
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -169,6 +169,7 @@ export default {
recovery_waiting_thread: 'Recovery waiting thread', recovery_waiting_thread: 'Recovery waiting thread',
recover_serial_wait: 'Recover serial wait', recover_serial_wait: 'Recover serial wait',
execute_task: 'Execute the specified task', execute_task: 'Execute the specified task',
dynamic_generation: 'Dynamic Generation',
recovery_suspend: 'Recovery Suspend', recovery_suspend: 'Recovery Suspend',
recovery_failed: 'Recovery Failed', recovery_failed: 'Recovery Failed',
gantt: 'Gantt', gantt: 'Gantt',
@ -837,7 +838,16 @@ export default {
dependent_failure_policy_waiting: 'waiting', dependent_failure_policy_waiting: 'waiting',
dependent_failure_waiting_time: 'Dependent failure waiting time', dependent_failure_waiting_time: 'Dependent failure waiting time',
dependent_failure_waiting_time_tips: dependent_failure_waiting_time_tips:
'Failure waiting time must be a positive integer' 'Failure waiting time must be a positive integer',
max_num_of_sub_workflow_instances: 'max num of sub workflow instances',
filter_condition: 'Filter Condition',
params_value: 'Params Value',
separator: 'Separator',
dynamic_name_tips: 'name(required)',
dynamic_value_tips: 'params or value(required)',
dynamic_separator_tips: 'separator(required)',
child_node_definition: 'child node definition',
child_node_instance: 'child node instance',
}, },
menu: { menu: {
fav: 'Favorites', fav: 'Favorites',

12
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -170,6 +170,7 @@ export default {
recovery_waiting_thread: '恢复等待线程', recovery_waiting_thread: '恢复等待线程',
recover_serial_wait: '串行恢复', recover_serial_wait: '串行恢复',
execute_task: '执行指定任务', execute_task: '执行指定任务',
dynamic_generation: '动态生成',
recovery_suspend: '恢复运行', recovery_suspend: '恢复运行',
recovery_failed: '重跑失败任务', recovery_failed: '重跑失败任务',
gantt: '甘特图', gantt: '甘特图',
@ -813,7 +814,16 @@ export default {
dependent_failure_policy_failure: '失败', dependent_failure_policy_failure: '失败',
dependent_failure_policy_waiting: '等待', dependent_failure_policy_waiting: '等待',
dependent_failure_waiting_time: '依赖失败等待时间', dependent_failure_waiting_time: '依赖失败等待时间',
dependent_failure_waiting_time_tips: '失败等待时间必须为正整数' dependent_failure_waiting_time_tips: '失败等待时间必须为正整数',
max_num_of_sub_workflow_instances: '动态生成实例上限',
filter_condition: '过滤条件',
params_value: '取值参数',
separator: '分隔符',
dynamic_name_tips: 'name(必填)',
dynamic_value_tips: 'params or value(必填)',
dynamic_separator_tips: '分隔符(必填)',
child_node_definition: '子节点定义',
child_node_instance: '子节点实例',
}, },
menu: { menu: {
fav: '收藏组件', fav: '收藏组件',

3
dolphinscheduler-ui/src/store/project/task-type.ts

@ -32,6 +32,9 @@ export const TASK_TYPES_MAP = {
SUB_PROCESS: { SUB_PROCESS: {
alias: 'SUB_PROCESS' alias: 'SUB_PROCESS'
}, },
DYNAMIC: {
alias: 'DYNAMIC'
},
PROCEDURE: { PROCEDURE: {
alias: 'PROCEDURE' alias: 'PROCEDURE'
}, },

1
dolphinscheduler-ui/src/store/project/types.ts

@ -23,6 +23,7 @@ type TaskExecuteType = 'STREAM' | 'BATCH'
type TaskType = type TaskType =
| 'SHELL' | 'SHELL'
| 'SUB_PROCESS' | 'SUB_PROCESS'
| 'DYNAMIC'
| 'PROCEDURE' | 'PROCEDURE'
| 'SQL' | 'SQL'
| 'SPARK' | 'SPARK'

2
dolphinscheduler-ui/src/views/projects/task/components/node/detail-modal.tsx

@ -158,7 +158,7 @@ const NodeDetailModal = defineComponent({
}, },
{ {
text: t('project.node.enter_this_child_node'), text: t('project.node.enter_this_child_node'),
show: props.data.taskType === 'SUB_PROCESS', show: props.data.taskType === 'SUB_PROCESS' || props.data.taskType === 'DYNAMIC',
disabled: disabled:
!props.data.id || !props.data.id ||
(router.currentRoute.value.name === 'workflow-instance-detail' && (router.currentRoute.value.name === 'workflow-instance-detail' &&

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -89,3 +89,4 @@ export { useKubeflow } from './use-kubeflow'
export { useLinkis } from './use-linkis' export { useLinkis } from './use-linkis'
export { useDataFactory } from './use-data-factory' export { useDataFactory } from './use-data-factory'
export { useRemoteShell } from './use-remote-shell' export { useRemoteShell } from './use-remote-shell'
export { useDynamic } from './use-dynamic'

120
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dynamic.ts

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { IJsonItem } from '../types'
import { useI18n } from 'vue-i18n'
export function useDynamic(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
return [
{
type: 'input-number',
field: 'maxNumOfSubWorkflowInstances',
span: 12,
name: t('project.node.max_num_of_sub_workflow_instances'),
validate: {
required: true
}
},
{
type: 'input-number',
field: 'degreeOfParallelism',
span: 12,
name: t('project.node.parallelism'),
validate: {
required: true
}
},
{
type: 'custom-parameters',
field: 'listParameters',
name: t('project.node.params_value'),
span: 24,
children: [
{
type: 'input',
field: 'name',
span: 8,
props: {
placeholder: t('project.node.dynamic_name_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_name_tips'))
}
const sameItems = model['listParameters'].filter(
(item: { name: string }) => item.name === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 8,
props: {
placeholder: t('project.node.dynamic_value_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_value_tips'))
}
}
}
},
{
type: 'input',
field: 'separator',
span: 4,
props: {
placeholder: t('project.node.dynamic_separator_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.dynamic_separator_tips'))
}
}
}
}
]
},
{
type: 'input',
field: 'filterCondition',
span: 24,
name: t('project.node.filter_condition')
}
]
}

10
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -32,7 +32,7 @@ export function formatParams(data: INodeData): {
taskDefinitionJsonObj: object taskDefinitionJsonObj: object
} { } {
const taskParams: ITaskParams = {} const taskParams: ITaskParams = {}
if (data.taskType === 'SUB_PROCESS') { if (data.taskType === 'SUB_PROCESS' || data.taskType === 'DYNAMIC') {
taskParams.processDefinitionCode = data.processDefinitionCode taskParams.processDefinitionCode = data.processDefinitionCode
} }
@ -481,6 +481,14 @@ export function formatParams(data: INodeData): {
taskParams.datasource = data.datasource taskParams.datasource = data.datasource
} }
if (data.taskType === 'DYNAMIC') {
taskParams.processDefinitionCode = data.processDefinitionCode
taskParams.maxNumOfSubWorkflowInstances = data.maxNumOfSubWorkflowInstances
taskParams.degreeOfParallelism = data.degreeOfParallelism
taskParams.filterCondition = data.filterCondition
taskParams.listParameters = data.listParameters
}
let timeoutNotifyStrategy = '' let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) { if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) { if (data.timeoutNotifyStrategy.length === 1) {

2
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -52,10 +52,12 @@ import { useKubeflow } from './use-kubeflow'
import { useLinkis } from './use-linkis' import { useLinkis } from './use-linkis'
import { useDataFactory } from './use-data-factory' import { useDataFactory } from './use-data-factory'
import { useRemoteShell } from './use-remote-shell' import { useRemoteShell } from './use-remote-shell'
import { useDynamic } from './use-dynamic'
export default { export default {
SHELL: useShell, SHELL: useShell,
SUB_PROCESS: useSubProcess, SUB_PROCESS: useSubProcess,
DYNAMIC: useDynamic,
PYTHON: usePython, PYTHON: usePython,
SPARK: useSpark, SPARK: useSpark,
MR: useMr, MR: useMr,

83
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dynamic.ts

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import { useRouter } from 'vue-router'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useDynamic({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const router = useRouter()
const workflowCode = router.currentRoute.value.params.code
const model = reactive({
taskType: 'DYNAMIC',
name: '',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
maxNumOfSubWorkflowInstances: 1024,
degreeOfParallelism: 1,
filterCondition: '',
listParameters: [{ name: null, value: null, separator: ',' }]
} as INodeData)
if (model.listParameters?.length) {
model.listParameters[0].disabled = true
}
return {
json: [
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useTimeoutAlarm(model),
Fields.useChildNode({
model,
projectCode,
from,
processName: data?.processName,
code: from === 1 ? 0 : Number(workflowCode)
}),
...Fields.useDynamic(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

4
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -436,6 +436,10 @@ interface ITaskParams {
factoryName?: string factoryName?: string
resourceGroupName?: string resourceGroupName?: string
pipelineName?: string pipelineName?: string
maxNumOfSubWorkflowInstances?: number
degreeOfParallelism?: number
filterCondition?: string
listParameters?: Array<any>
} }
interface INodeData interface INodeData

4
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -17,6 +17,7 @@
export type TaskType = export type TaskType =
| 'SHELL' | 'SHELL'
| 'SUB_PROCESS' | 'SUB_PROCESS'
| 'DYNAMIC'
| 'PROCEDURE' | 'PROCEDURE'
| 'SQL' | 'SQL'
| 'SPARK' | 'SPARK'
@ -65,6 +66,9 @@ export const TASK_TYPES_MAP = {
SUB_PROCESS: { SUB_PROCESS: {
alias: 'SUB_PROCESS' alias: 'SUB_PROCESS'
}, },
DYNAMIC: {
alias: 'DYNAMIC'
},
PROCEDURE: { PROCEDURE: {
alias: 'PROCEDURE' alias: 'PROCEDURE'
}, },

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -107,6 +107,9 @@ $bgLight: #ffffff;
&.icon-sub_process { &.icon-sub_process {
background-image: url('/images/task-icons/sub_process.png'); background-image: url('/images/task-icons/sub_process.png');
} }
&.icon-dynamic {
background-image: url('/images/task-icons/dynamic.png');
}
&.icon-data_quality { &.icon-data_quality {
background-image: url('/images/task-icons/data_quality.png'); background-image: url('/images/task-icons/data_quality.png');
} }
@ -220,6 +223,9 @@ $bgLight: #ffffff;
&.icon-sub_process { &.icon-sub_process {
background-image: url('/images/task-icons/sub_process_hover.png'); background-image: url('/images/task-icons/sub_process_hover.png');
} }
&.icon-dynamic {
background-image: url('/images/task-icons/dynamic_hover.png');
}
&.icon-data_quality { &.icon-data_quality {
background-image: url('/images/task-icons/data_quality_hover.png'); background-image: url('/images/task-icons/data_quality_hover.png');
} }

Loading…
Cancel
Save