diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 17ee92367f..6c63f157df 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -101,6 +101,10 @@ export default {
title: 'SubProcess',
link: '/en-us/docs/dev/user_doc/guide/task/sub-process.html',
},
+ {
+ title: 'Dynamic',
+ link: '/en-us/docs/dev/user_doc/guide/task/dynamic.html',
+ },
{
title: 'Dependent',
link: '/en-us/docs/dev/user_doc/guide/task/dependent.html',
@@ -810,6 +814,10 @@ export default {
title: 'SubProcess',
link: '/zh-cn/docs/dev/user_doc/guide/task/sub-process.html',
},
+ {
+ title: 'Dynamic',
+ link: '/zh-cn/docs/dev/user_doc/guide/task/dynamic.html',
+ },
{
title: 'Dependent',
link: '/zh-cn/docs/dev/user_doc/guide/task/dependent.html',
diff --git a/docs/docs/en/guide/task/dynamic.md b/docs/docs/en/guide/task/dynamic.md
new file mode 100644
index 0000000000..b304f78ee7
--- /dev/null
+++ b/docs/docs/en/guide/task/dynamic.md
@@ -0,0 +1,77 @@
+# Dynamic Task
+
+## Overview
+
+Dynamic task can input multiple parameter lists, calculate all parameter combinations through Cartesian product, and then execute each parameter combination as a sub-workflow node.
+
+For example, we have a workflow with two input parameters, a, b.
+
+We can use the dynamic node to define this workflow definition as a node, and then enter the parameter list
+
+- Parameter a: a1, a2
+- Parameter b: b1, b2
+
+Then the dynamic node will calculate four parameter combinations, which are
+- a1, b1
+- a1, b2
+- a2, b1
+- a2, b2
+
+Then execute these four parameter combinations as the startup parameters of the sub-workflow node, and a total of four sub-workflow nodes are generated.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
+ DAG editing page.
+- Drag from the toolbar task node to canvas.
+
+The task definition is shown in the following figure:
+
+![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
+
+## Task Parameters
+
+[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
+[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.)
+
+- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
+
+| **Task Parameters** | **Description** |
+|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Child Node | Select the workflow definition of the sub-workflow. You can jump to the workflow definition of the selected sub-workflow by entering the sub-node in the upper right corner. |
+| max num of sub workflow instances | The maximum number of sub-workflow instances dynamically generated. After exceeding this upper limit, the dynamically generated sub-workflow instances will no longer be executed. |
+| Parallelism | The parallelism of the sub-workflow instances dynamically generated, that is, the number of sub-workflow instances executed at the same time. |
+| Param Value | The parameter of the sub-workflow instance dynamically generated, supports multiple parameters, and the parameters are separated by delimiters. |
+| Filter Condition | The filter condition of the sub-workflow instance dynamically generated, supports multiple filter values, and the filter conditions are separated by commas, such as `2022,2023`, which will filter the parameter groups containing the values of 2022 and 2023. |
+
+## Task Parameters Output
+
+The output parameters of the dynamic node refer to the output parameters of the sub-workflow. The output parameters of all sub-workflows will be collected into a list as the output parameters of the dynamic node.
+
+When the downstream task is referenced, it can be referenced by `${dynamic.out(TaskName)}`.
+
+The value is a json, as shown below
+
+```Json
+[
+ { "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
+ { "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
+ { "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
+]
+```
+
+- `dynParams` the input parameters of the sub-workflow
+- `outputValue` is the output parameter of the sub-workflow. For example, the `p` here is a string that splices the output parameters `a` and `b` of the sub-workflow and outputs them in the form of variables `p`
+- `mappedTimes` is the index of the execution of the sub-workflow, starting from 1
+
+## Running Status
+
+After the dynamic task is started, all parameter combinations will be calculated according to the input parameter list, and then a sub-workflow instance will be created for each parameter combination.
+
+When the dynamic task is running, it will periodically check the statistical information of all current sub-workflow instances. If the parallelism is greater than the number of sub-workflow instances running, it will trigger the start of the appropriate number of sub-workflow instances (the sub-workflow instances are created first, and then the start is triggered later).
+
+As shown below.
+
+![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
+
+The dynamic task will run successfully only when all sub-workflow instances are running successfully.
diff --git a/docs/docs/zh/guide/task/dynamic.md b/docs/docs/zh/guide/task/dynamic.md
new file mode 100644
index 0000000000..ee1d20adb1
--- /dev/null
+++ b/docs/docs/zh/guide/task/dynamic.md
@@ -0,0 +1,76 @@
+# 动态节点
+
+## 综述
+
+动态节点可以通过输入多个参数列表,通过笛卡尔积计算出多所有的参数组合,然后将每个参数组合作为一个子工作流节点执行。
+
+比如我们有一个工作流,它具有两个输入参数,a, b。
+我们可以通过动态节点,将这个工作流定义当做一个节点,然后输入参数列表
+- 参数a:a1, a2
+- 参数b:b1, b2
+
+那么动态节点会计算出四个参数组合,分别是
+- a1, b1
+- a1, b2
+- a2, b1
+- a2, b2
+
+然后将这四个参数组合作为子工作流节点的启动参数执行,共生成四个子工作流节点。
+
+## 创建任务
+
+- 点击项目管理 -> 项目名称 -> 工作流定义,点击”创建工作流”按钮,进入 DAG 编辑页面:
+
+- 拖动工具栏的 任务节点到画板中。
+
+任务定义如下图所示:
+
+![dynamic_definition](../../../../img/tasks/demo/dynamic_definition.png)
+
+## 任务参数
+
+[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
+[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)
+
+- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
+
+| **任务参数** | **描述** |
+|----------|------------------------------------------------------------------------------|
+| 子节点 | 是选择子工作流的工作流定义,右上角进入该子节点可以跳转到所选子工作流的工作流定义 |
+| 动态生成实例上限 | 是指动态生成的子工作流实例的上限,超过该上限后,动态生成的子工作流实例将不再执行 |
+| 并行度 | 是指动态生成的子工作流实例的并行度,即同时执行的子工作流实例的数量 |
+| 取值参数 | 是指动态生成的子工作流实例的参数,支持多个参数,参数之间用分隔符分隔 |
+| 过滤条件 | 是指动态生成的子工作流实例的过滤条件,支持多个过滤值,过滤条件之间用逗号分隔, 如 `2022,2023`, 则会过来包含2022和2023的值的参数组 |
+
+## 任务参数输出
+
+动态节点的输出参数,是指子工作流的输出参数,所有子工作流的输出参数都会被收集到一个列表中,作为动态节点的输出参数。
+
+下游任务引用的时候,可以通过 `${dynamic.out(TaskName)}` 的方式引用。
+
+值为一个json,样例如下
+
+```Json
+[
+ { "dynParams":{ "a":"a1", "b":"b1" }, "outputValue":{ "p":"a1-b1" }, "mappedTimes":1 },
+ { "dynParams":{ "a":"a2", "b":"b1" }, "outputValue":{ "p":"a2-b1" }, "mappedTimes":2 },
+ { "dynParams":{ "a":"a3", "b":"b1" }, "outputValue":{ "p":"a3-b1" }, "mappedTimes":3 }
+]
+```
+
+其中
+- `dynParams` 是子工作流的输入参数
+- `outputValue` 是子工作流的输出参数, 如这里的`p`为将子工作流的输出参数`a`和`b`拼接起来的字符串,以变量`p`的形式输出
+- `mappedTimes` 是子工作流的执行的index,从1开始
+
+## 运行状态
+
+Dynamic任务启动后,会根据输入参数列表,计算出所有的参数组合,然后对每一个参数组合,创建一个子工作流实例。
+
+Dynamic运行时,会定期检测当前所有子工作流实例的统计信息,如果并行度大于运行中的子工作流实例的数量,则会触发启动合适数量的工作流实例(工作流实例是先创建,后续再出发启动)。
+
+如下如所示。
+
+![dynamic_running](../../../../img/tasks/demo/dynamic_running.png)
+
+当且进度所有的子工作流实例全部运行成功时,dynamic任务才会运行成功。
diff --git a/docs/img/tasks/demo/dynamic_definition.png b/docs/img/tasks/demo/dynamic_definition.png
new file mode 100644
index 0000000000..e3eef9fa2c
Binary files /dev/null and b/docs/img/tasks/demo/dynamic_definition.png differ
diff --git a/docs/img/tasks/demo/dynamic_running.png b/docs/img/tasks/demo/dynamic_running.png
new file mode 100644
index 0000000000..cb7d8bcfaa
Binary files /dev/null and b/docs/img/tasks/demo/dynamic_running.png differ
diff --git a/docs/img/tasks/icons/dynamic.png b/docs/img/tasks/icons/dynamic.png
new file mode 100644
index 0000000000..6df7485872
Binary files /dev/null and b/docs/img/tasks/icons/dynamic.png differ
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index b64f51a70b..947cb05951 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
@@ -319,6 +320,28 @@ public class ProcessInstanceController extends BaseController {
return returnDataList(result);
}
+ /**
+ * query dynamic sub process instance detail info by task id
+ *
+ * @param loginUser login user
+ * @param taskId task id
+ * @return sub process instance detail
+ */
+ @Operation(summary = "queryDynamicSubWorkflowInstances", description = "QUERY_DYNAMIC_SUBPROCESS_INSTANCE_BY_TASK_CODE_NOTES")
+ @Parameters({
+ @Parameter(name = "taskId", description = "taskInstanceId", required = true, schema = @Schema(implementation = int.class, example = "100"))
+ })
+ @GetMapping(value = "/query-dynamic-sub-workflows")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result> queryDynamicSubWorkflowInstances(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @RequestParam("taskId") Integer taskId) {
+ List dynamicSubWorkflowDtos =
+ processInstanceService.queryDynamicSubWorkflowInstances(loginUser, taskId);
+ return new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), dynamicSubWorkflowDtos);
+ }
+
/**
* query process instance global variables and local variables
*
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
new file mode 100644
index 0000000000..5a482f0a6c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/DynamicSubWorkflowDto.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.dto;
+
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class DynamicSubWorkflowDto {
+
+ private long processInstanceId;
+
+ private String name;
+
+ private long index;
+
+ private Map parameters;
+
+ private WorkflowExecutionStatus state;
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index d29235439d..4ad45709cf 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -271,6 +271,8 @@ public enum Status {
NOT_SUPPORT_SSO(10211, "Not support SSO login.", "不支持SSO登录"),
STATE_CODE_ERROR(10212, "state inconsistency or state and code not pair", "状态码前后不一致或状态码和code不匹配"),
+ TASK_INSTANCE_NOT_DYNAMIC_TASK(10213, "task instance {0} is not dynamic", "任务实例[{0}]不是Dynamic类型"),
+
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 304d0139e2..ccb3f98b4d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -135,6 +136,9 @@ public interface ProcessInstanceService {
long projectCode,
Integer taskId);
+ List queryDynamicSubWorkflowInstances(User loginUser,
+ Integer taskId);
+
/**
* update process instance
*
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 26606caa3b..1df2ff4ce4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
+import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
@@ -44,6 +45,7 @@ import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
@@ -57,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@@ -66,6 +69,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
@@ -93,6 +97,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -184,6 +189,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private ScheduleMapper scheduleMapper;
+ @Autowired
+ private RelationSubWorkflowMapper relationSubWorkflowMapper;
+
@Autowired
private AlertDao alertDao;
@@ -488,6 +496,66 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
+ @Override
+ public List queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
+ TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskId);
+ Map result = new HashMap<>();
+ if (taskInstance == null) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ }
+
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
+ if (taskDefinition == null) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ }
+
+ if (!taskInstance.isDynamic()) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, taskInstance.getName());
+ throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ }
+ List relationSubWorkflows = relationSubWorkflowMapper
+ .queryAllSubProcessInstance(Long.valueOf(taskInstance.getProcessInstanceId()),
+ taskInstance.getTaskCode());
+ List allSubProcessInstanceId = relationSubWorkflows.stream()
+ .map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
+ List allSubWorkflows = processInstanceDao.queryBatchIds(allSubProcessInstanceId);
+
+ if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
+ putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
+ throw new ServiceException(Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
+ }
+ Long subWorkflowCode = allSubWorkflows.get(0).getProcessDefinitionCode();
+ int subWorkflowVersion = allSubWorkflows.get(0).getProcessDefinitionVersion();
+ ProcessDefinition subProcessDefinition =
+ processService.findProcessDefinition(subWorkflowCode, subWorkflowVersion);
+ if (subProcessDefinition == null) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode);
+ throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, subWorkflowCode);
+ }
+
+ allSubWorkflows.sort(Comparator.comparing(ProcessInstance::getId));
+
+ List allDynamicSubWorkflowDtos = new ArrayList<>();
+ int index = 1;
+ for (ProcessInstance processInstance : allSubWorkflows) {
+ DynamicSubWorkflowDto dynamicSubWorkflowDto = new DynamicSubWorkflowDto();
+ dynamicSubWorkflowDto.setProcessInstanceId(processInstance.getId());
+ dynamicSubWorkflowDto.setIndex(index);
+ dynamicSubWorkflowDto.setState(processInstance.getState());
+ dynamicSubWorkflowDto.setName(subProcessDefinition.getName());
+ Map commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
+ String parameter = commandParamMap.get(CommandKeyConstants.CMD_DYNAMIC_START_PARAMS);
+ dynamicSubWorkflowDto.setParameters(JSONUtils.toMap(parameter));
+ allDynamicSubWorkflowDtos.add(dynamicSubWorkflowDto);
+ index++;
+
+ }
+
+ return allDynamicSubWorkflowDtos;
+ }
+
/**
* add dependent result for dependent task
*/
diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
index 7a21c36946..9105d50697 100644
--- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml
+++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml
@@ -40,6 +40,7 @@ task:
- 'DEPENDENT'
- 'CONDITIONS'
- 'SWITCH'
+ - 'DYNAMIC'
dataIntegration:
- 'SEATUNNEL'
- 'DATAX'
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
index c1f801257a..68dfdd3a6f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/CommandKeyConstants.java
@@ -45,6 +45,8 @@ public class CommandKeyConstants {
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
+ public static final String CMD_DYNAMIC_START_PARAMS = "dynamicParams";
+
/**
* complement data start date
*/
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 7aa2b881b3..27a2dc7ac3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -56,6 +56,7 @@ public enum CommandType {
RECOVER_WAITING_THREAD(10, "recover waiting thread"),
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
EXECUTE_TASK(12, "start a task node in a process instance"),
+ DYNAMIC_GENERATION(13, "dynamic generation"),
;
CommandType(int code, String descp) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
index 4fced36110..e8ec2dbc08 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
@@ -40,6 +40,7 @@ public enum WorkflowExecutionStatus {
SERIAL_WAIT(14, "serial wait"),
READY_BLOCK(15, "ready block"),
BLOCK(16, "block"),
+ WAIT_TO_RUN(17, "wait to run"),
;
private static final Map CODE_MAP = new HashMap<>();
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
index 9d30aa71ca..6d6e5497d4 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
@@ -59,6 +59,7 @@ public class BusinessTimeUtils {
case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING:
+ case DYNAMIC_GENERATION:
case SCHEDULER:
default:
businessDate = addDays(new Date(), -1);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
new file mode 100644
index 0000000000..bc19a9d130
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/RelationSubWorkflow.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.entity;
+
+import lombok.Data;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@Data
+@TableName("t_ds_relation_sub_workflow")
+public class RelationSubWorkflow {
+
+ /**
+ * id
+ */
+ @TableId(value = "id", type = IdType.AUTO)
+ private Integer id;
+
+ /**
+ * parent process instance id
+ */
+ private Long parentWorkflowInstanceId;
+
+ /**
+ * parent task instance id
+ */
+ private Long parentTaskCode;
+
+ /**
+ * process instance id
+ */
+ private Long subWorkflowInstanceId;
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index cf5eb62ec7..0a0b4876e4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SEC_2_MINUT
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DYNAMIC;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
@@ -357,6 +358,10 @@ public class TaskInstance implements Serializable {
return TASK_TYPE_DEPENDENT.equalsIgnoreCase(this.taskType);
}
+ public boolean isDynamic() {
+ return TASK_TYPE_DYNAMIC.equalsIgnoreCase(this.taskType);
+ }
+
public boolean isConditionsTask() {
return TASK_TYPE_CONDITIONS.equalsIgnoreCase(this.taskType);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
new file mode 100644
index 0000000000..3c8fb2084b
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.mapper;
+
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * process instance map mapper interface
+ */
+public interface RelationSubWorkflowMapper extends BaseMapper {
+
+ int batchInsert(@Param("relationSubWorkflows") List relationSubWorkflows);
+
+ List queryAllSubProcessInstance(@Param("parentWorkflowInstanceId") Long parentWorkflowInstanceId,
+ @Param("parentTaskCode") Long parentTaskCode);
+
+ RelationSubWorkflow queryParentWorkflowInstance(@Param("subWorkflowInstanceId") Long subWorkflowInstanceId);
+
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 0144356dfe..32e9f97133 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -38,6 +38,8 @@ public interface ProcessInstanceDao {
*/
public int upsertProcessInstance(ProcessInstance processInstance);
+ List queryBatchIds(List processInstanceIds);
+
void deleteByIds(List needToDeleteWorkflowInstanceIds);
void deleteById(Integer workflowInstanceId);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 5117145fd4..1d4775cb29 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.collections4.CollectionUtils;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -65,6 +66,14 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
}
}
+ @Override
+ public List queryBatchIds(List processInstanceIds) {
+ if (CollectionUtils.isEmpty(processInstanceIds)) {
+ return new ArrayList<>();
+ }
+ return processInstanceMapper.selectBatchIds(processInstanceIds);
+ }
+
@Override
public void deleteByIds(List needToDeleteWorkflowInstanceIds) {
if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) {
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
new file mode 100644
index 0000000000..59701b432c
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/RelationSubWorkflowMapper.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+ id, parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id
+
+
+ insert into t_ds_relation_sub_workflow (parent_workflow_instance_id, parent_task_code, sub_workflow_instance_id)
+ values
+
+ (#{relationSubWorkflow.parentWorkflowInstanceId}, #{relationSubWorkflow.parentTaskCode}, #{relationSubWorkflow.subWorkflowInstanceId})
+
+
+
+
+
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index ff73f5c13d..e63c5b53ea 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -2042,3 +2042,16 @@ CREATE TABLE t_ds_trigger_relation
PRIMARY KEY (id),
UNIQUE KEY t_ds_trigger_relation_UN(trigger_type,job_id,trigger_code)
);
+
+
+DROP TABLE IF EXISTS t_ds_relation_sub_workflow;
+CREATE TABLE t_ds_relation_sub_workflow (
+ id BIGINT AUTO_INCREMENT NOT NULL,
+ parent_workflow_instance_id BIGINT NOT NULL,
+ parent_task_code BIGINT NOT NULL,
+ sub_workflow_instance_id BIGINT NOT NULL,
+ PRIMARY KEY (id),
+ INDEX idx_parent_workflow_instance_id (parent_workflow_instance_id),
+ INDEX idx_parent_task_code (parent_task_code),
+ INDEX idx_sub_workflow_instance_id (sub_workflow_instance_id)
+) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index d45305336b..acd5422440 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -2016,3 +2016,16 @@ CREATE TABLE `t_ds_trigger_relation` (
KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`),
UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
+
+
+DROP TABLE IF EXISTS `t_ds_relation_sub_workflow`;
+CREATE TABLE `t_ds_relation_sub_workflow` (
+ `id` bigint NOT NULL AUTO_INCREMENT,
+ `parent_workflow_instance_id` bigint NOT NULL,
+ `parent_task_code` bigint NOT NULL,
+ `sub_workflow_instance_id` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ KEY `idx_parent_workflow_instance_id` (`parent_workflow_instance_id`),
+ KEY `idx_parent_task_code` (`parent_task_code`),
+ KEY `idx_sub_workflow_instance_id` (`sub_workflow_instance_id`)
+);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index afb8176399..d219d65f4d 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -2001,3 +2001,16 @@ CREATE TABLE t_ds_trigger_relation (
PRIMARY KEY (id),
CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code)
);
+
+DROP TABLE IF EXISTS t_ds_relation_sub_workflow;
+CREATE TABLE t_ds_relation_sub_workflow (
+ id serial NOT NULL,
+ parent_workflow_instance_id BIGINT NOT NULL,
+ parent_task_code BIGINT NOT NULL,
+ sub_workflow_instance_id BIGINT NOT NULL,
+ PRIMARY KEY (id)
+);
+CREATE INDEX idx_parent_workflow_instance_id ON t_ds_relation_sub_workflow (parent_workflow_instance_id);
+CREATE INDEX idx_parent_task_code ON t_ds_relation_sub_workflow (parent_task_code);
+CREATE INDEX idx_sub_workflow_instance_id ON t_ds_relation_sub_workflow (sub_workflow_instance_id);
+
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
index b4d0ca2950..a46732f498 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -412,3 +412,27 @@ d//
delimiter ;
CALL add_improvement_workflow_run_tenant;
DROP PROCEDURE add_improvement_workflow_run_tenant;
+
+-- uc_dolphin_T_t_ds_relation_sub_workflow
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_relation_sub_workflow;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_relation_sub_workflow()
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_relation_sub_workflow'
+ AND TABLE_SCHEMA=(SELECT DATABASE()))
+ THEN
+CREATE TABLE `t_ds_relation_sub_workflow` (
+ `id` bigint NOT NULL AUTO_INCREMENT,
+ `parent_workflow_instance_id` bigint NOT NULL,
+ `parent_task_code` bigint NOT NULL,
+ `sub_workflow_instance_id` bigint NOT NULL,
+ PRIMARY KEY (`id`),
+ KEY `idx_parent_workflow_instance_id` (`parent_workflow_instance_id`),
+ KEY `idx_parent_task_code` (`parent_task_code`),
+ KEY `idx_sub_workflow_instance_id` (`sub_workflow_instance_id`)
+);
+END IF;
+END;
+
+d//
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
index 233676c3a5..df25de7574 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -315,3 +315,27 @@ d//
delimiter ;
select add_improvement_workflow_run_tenant();
DROP FUNCTION add_improvement_workflow_run_tenant();
+
+-- uc_dolphin_T_t_ds_relation_sub_workflow
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_relation_sub_workflow()
+RETURNS VOID AS $$
+BEGIN
+ IF NOT EXISTS (
+ SELECT 1
+ FROM information_schema.columns
+ WHERE table_name='t_ds_relation_sub_workflow'
+ AND table_schema=current_schema()
+ ) THEN
+CREATE TABLE t_ds_relation_sub_workflow (
+ id serial NOT NULL,
+ parent_workflow_instance_id BIGINT NOT NULL,
+ parent_task_code BIGINT NOT NULL,
+ sub_workflow_instance_id BIGINT NOT NULL,
+ PRIMARY KEY (id)
+);
+CREATE INDEX idx_parent_workflow_instance_id ON t_ds_relation_sub_workflow (parent_workflow_instance_id);
+CREATE INDEX idx_parent_task_code ON t_ds_relation_sub_workflow (parent_task_code);
+CREATE INDEX idx_sub_workflow_instance_id ON t_ds_relation_sub_workflow (sub_workflow_instance_id);
+END IF;
+END;
+$$ LANGUAGE plpgsql;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index e01955d81a..16476ad7f7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -37,7 +37,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
- StateEvent stateEvent) throws StateEventHandleError {
+ StateEvent stateEvent) throws StateEventHandleError, StateEventHandleException {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
TaskMetrics.incTaskInstanceByState("timeout");
@@ -62,6 +62,7 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
if (taskExecuteRunnableMap.containsKey(taskInstance.getTaskCode())) {
taskExecuteRunnableMap.get(taskInstance.getTaskCode()).timeout();
+ workflowExecuteRunnable.taskFinished(taskInstance);
} else {
log.warn(
"cannot find the task processor for task {}, so skip task processor action.",
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 12fc29de29..06f2bfc549 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -445,8 +445,7 @@ public class WorkflowExecuteRunnable implements Callable {
if (taskInstance.getState().isSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
- // todo: merge the last taskInstance
- processInstance.setVarPool(taskInstance.getVarPool());
+ mergeTaskInstanceVarPool(taskInstance);
processInstanceDao.upsertProcessInstance(processInstance);
// save the cacheKey only if the task is defined as cache task and the task is success
if (taskInstance.getIsCache().equals(Flag.YES)) {
@@ -2212,4 +2211,25 @@ public class WorkflowExecuteRunnable implements Callable {
private boolean isExecutedOnMaster(String host) {
return host.endsWith(masterAddress.split(Constants.COLON)[1]);
}
+
+ private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
+ String taskVarPoolJson = taskInstance.getVarPool();
+ if (StringUtils.isEmpty(taskVarPoolJson)) {
+ return;
+ }
+ String processVarPoolJson = processInstance.getVarPool();
+ if (StringUtils.isEmpty(processVarPoolJson)) {
+ processInstance.setVarPool(taskVarPoolJson);
+ return;
+ }
+ List processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
+ List taskVarPool = JSONUtils.toList(taskVarPoolJson, Property.class);
+ Set newProcessVarPoolKeys = taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
+ processVarPool = processVarPool.stream().filter(property -> !newProcessVarPoolKeys.contains(property.getProp()))
+ .collect(Collectors.toList());
+
+ processVarPool.addAll(taskVarPool);
+
+ processInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
+ }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
index f196d85153..c5689f6a1b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask;
+import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask;
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask;
import java.util.Set;
@@ -38,7 +39,8 @@ public class MasterTaskExecuteRunnableFactoryBuilder {
private static final Set ASYNC_TASK_TYPE = Sets.newHashSet(
DependentLogicTask.TASK_TYPE,
- SubWorkflowLogicTask.TASK_TYPE);
+ SubWorkflowLogicTask.TASK_TYPE,
+ DynamicLogicTask.TASK_TYPE);
public MasterDelayTaskExecuteRunnableFactory extends MasterDelayTaskExecuteRunnable> createWorkerDelayTaskExecuteRunnableFactory(String taskType) {
if (ASYNC_TASK_TYPE.contains(taskType)) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
new file mode 100644
index 0000000000..b433b6f3bc
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicAsyncTaskExecuteFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_DYNAMIC_START_PARAMS;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
+import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DynamicAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction {
+
+ private static final Duration TASK_EXECUTE_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
+
+ private static final String OUTPUT_KEY = "dynamic.out";
+
+ private final ProcessInstance processInstance;
+
+ private final TaskInstance taskInstance;
+
+ private final SubWorkflowService subWorkflowService;
+
+ private final CommandMapper commandMapper;
+
+ private final int degreeOfParallelism;
+
+ private final DynamicLogicTask logicTask;
+
+ public DynamicAsyncTaskExecuteFunction(TaskExecutionContext taskExecutionContext,
+ ProcessInstance processInstance,
+ TaskInstance taskInstance,
+ DynamicLogicTask dynamicLogicTask,
+ CommandMapper commandMapper,
+ SubWorkflowService subWorkflowService,
+ int degreeOfParallelism) {
+ this.processInstance = processInstance;
+ this.taskInstance = taskInstance;
+ this.logicTask = dynamicLogicTask;
+ this.degreeOfParallelism = degreeOfParallelism;
+
+ this.commandMapper = commandMapper;
+ this.subWorkflowService = subWorkflowService;
+ }
+
+ @Override
+ public @NonNull AsyncTaskExecutionStatus getAsyncTaskExecutionStatus() {
+ List allSubProcessInstance = getAllSubProcessInstance();
+ int totalSubProcessInstanceCount = allSubProcessInstance.size();
+
+ List finishedSubProcessInstance =
+ subWorkflowService.filterFinishProcessInstances(allSubProcessInstance);
+
+ if (finishedSubProcessInstance.size() == totalSubProcessInstanceCount) {
+ log.info("all sub process instance finish");
+ int successCount = subWorkflowService.filterSuccessProcessInstances(finishedSubProcessInstance).size();
+ log.info("success sub process instance count: {}", successCount);
+ if (successCount == totalSubProcessInstanceCount) {
+ log.info("all sub process instance success");
+ setOutputParameters();
+ return AsyncTaskExecutionStatus.SUCCESS;
+ } else {
+ int failedCount = totalSubProcessInstanceCount - successCount;
+ log.info("failed sub process instance count: {}", failedCount);
+ return AsyncTaskExecutionStatus.FAILED;
+ }
+ }
+
+ if (logicTask.isCancel()) {
+ return AsyncTaskExecutionStatus.FAILED;
+ }
+
+ int runningCount = subWorkflowService.filterRunningProcessInstances(allSubProcessInstance).size();
+ int startCount = degreeOfParallelism - runningCount;
+ if (startCount > 0) {
+ log.info("There are {} sub process instances that can be started", startCount);
+ startSubProcessInstances(allSubProcessInstance, startCount);
+ }
+ // query the status of sub workflow instance
+ return AsyncTaskExecutionStatus.RUNNING;
+ }
+
+ private void setOutputParameters() {
+ log.info("set varPool");
+ List allSubProcessInstance = getAllSubProcessInstance();
+
+ List dynamicOutputs = new ArrayList<>();
+ int index = 1;
+ for (ProcessInstance processInstance : allSubProcessInstance) {
+ DynamicOutput dynamicOutput = new DynamicOutput();
+ Map dynamicParams =
+ JSONUtils.toMap(JSONUtils.toMap(processInstance.getCommandParam()).get(CMD_DYNAMIC_START_PARAMS));
+ dynamicOutput.setDynParams(dynamicParams);
+
+ Map outputValueMap = new HashMap<>();
+ List propertyList = subWorkflowService.getWorkflowOutputParameters(processInstance);
+ for (Property property : propertyList) {
+ outputValueMap.put(property.getProp(), property.getValue());
+ }
+
+ dynamicOutput.setOutputValue(outputValueMap);
+ dynamicOutput.setMappedTimes(index++);
+ dynamicOutputs.add(dynamicOutput);
+ }
+
+ Property property = new Property();
+ property.setProp(String.format("%s(%s)", OUTPUT_KEY, taskInstance.getName()));
+ property.setDirect(Direct.OUT);
+ property.setType(DataType.VARCHAR);
+ property.setValue(JSONUtils.toJsonString(dynamicOutputs));
+
+ List taskPropertyList = new ArrayList<>(JSONUtils.toList(taskInstance.getVarPool(), Property.class));
+ taskPropertyList.add(property);
+ logicTask.getTaskParameters().setVarPool(JSONUtils.toJsonString(taskPropertyList));
+
+ log.info("set property: {}", property);
+ }
+
+ private void startSubProcessInstances(List allSubProcessInstance, int startCount) {
+ List waitingProcessInstances =
+ subWorkflowService.filterWaitToRunProcessInstances(allSubProcessInstance);
+
+ for (int i = 0; i < Math.min(startCount, waitingProcessInstances.size()); i++) {
+ ProcessInstance subProcessInstance = waitingProcessInstances.get(i);
+ Map parameters = JSONUtils.toMap(DynamicCommandUtils
+ .getDataFromCommandParam(subProcessInstance.getCommandParam(), CMD_DYNAMIC_START_PARAMS));
+ Command command = DynamicCommandUtils.createCommand(this.processInstance,
+ subProcessInstance.getProcessDefinitionCode(), subProcessInstance.getProcessDefinitionVersion(),
+ parameters);
+ command.setProcessInstanceId(subProcessInstance.getId());
+ commandMapper.insert(command);
+ log.info("start sub process instance, sub process instance id: {}, command: {}", subProcessInstance.getId(),
+ command);
+ }
+ }
+
+ public List getAllSubProcessInstance() {
+ return subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
+ }
+
+ @Override
+ public @NonNull Duration getAsyncTaskStateCheckInterval() {
+ return TASK_EXECUTE_STATE_CHECK_INTERVAL;
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
new file mode 100644
index 0000000000..e360a8857e
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicCommandUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+public class DynamicCommandUtils {
+
+ static public Command createCommand(ProcessInstance processInstance,
+ Long subProcessDefinitionCode,
+ Integer subProcessDefinitionVersion,
+ Map parameters) {
+ Command command = new Command();
+ if (processInstance.getCommandType().equals(CommandType.START_PROCESS)) {
+ command.setCommandType(CommandType.DYNAMIC_GENERATION);
+ } else {
+ command.setCommandType(processInstance.getCommandType());
+ }
+ command.setProcessDefinitionCode(subProcessDefinitionCode);
+ command.setProcessDefinitionVersion(subProcessDefinitionVersion);
+ command.setTaskDependType(TaskDependType.TASK_POST);
+ command.setFailureStrategy(processInstance.getFailureStrategy());
+ command.setWarningType(processInstance.getWarningType());
+
+ String globalParams = processInstance.getGlobalParams();
+ if (StringUtils.isNotEmpty(globalParams)) {
+ List parentParams = Lists.newArrayList(JSONUtils.toList(globalParams, Property.class));
+ for (Property parentParam : parentParams) {
+ parameters.put(parentParam.getProp(), parentParam.getValue());
+ }
+ }
+
+ addDataToCommandParam(command, CommandKeyConstants.CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(parameters));
+ command.setExecutorId(processInstance.getExecutorId());
+ command.setWarningGroupId(processInstance.getWarningGroupId());
+ command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+ command.setWorkerGroup(processInstance.getWorkerGroup());
+ command.setDryRun(processInstance.getDryRun());
+ return command;
+ }
+
+ static public String getDataFromCommandParam(String commandParam, String key) {
+ Map cmdParam = JSONUtils.toMap(commandParam);
+ return cmdParam.get(key);
+ }
+
+ static void addDataToCommandParam(Command command, String key, String data) {
+ Map cmdParam = JSONUtils.toMap(command.getCommandParam());
+ if (cmdParam == null) {
+ cmdParam = new HashMap<>();
+ }
+ cmdParam.put(key, data);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ }
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
new file mode 100644
index 0000000000..d3494042cd
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner.task.dynamic;
+
+import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.DynamicInputParameter;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.remote.command.workflow.WorkflowStateEventChangeRequest;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient;
+import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction;
+import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.subworkflow.SubWorkflowService;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+
+@Slf4j
+public class DynamicLogicTask extends BaseAsyncLogicTask {
+
+ public static final String TASK_TYPE = "DYNAMIC";
+ private final ProcessInstanceDao processInstanceDao;
+
+ private final SubWorkflowService subWorkflowService;
+
+ private final ProcessDefinitionMapper processDefineMapper;
+
+ private final CommandMapper commandMapper;
+
+ private final ProcessService processService;
+
+ private ProcessInstance processInstance;
+
+ private TaskInstance taskInstance;
+
+ private final MasterRpcClient masterRpcClient;
+
+ private boolean haveBeenCanceled = false;
+
+ public DynamicLogicTask(TaskExecutionContext taskExecutionContext,
+ ProcessInstanceDao processInstanceDao,
+ TaskInstanceDao taskInstanceDao,
+ SubWorkflowService subWorkflowService,
+ ProcessService processService,
+ MasterRpcClient masterRpcClient,
+ ProcessDefinitionMapper processDefineMapper,
+ CommandMapper commandMapper) {
+ super(taskExecutionContext,
+ JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference() {
+ }));
+ this.processInstanceDao = processInstanceDao;
+ this.subWorkflowService = subWorkflowService;
+ this.processService = processService;
+ this.masterRpcClient = masterRpcClient;
+ this.processDefineMapper = processDefineMapper;
+ this.commandMapper = commandMapper;
+
+ this.processInstance =
+ processInstanceDao.queryByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId());
+ this.taskInstance = taskInstanceDao.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
+ }
+
+ @Override
+ public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws MasterTaskExecuteException {
+ List