diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index e92a68c8c1..2f5a50c813 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -141,6 +141,10 @@ export default {
title: 'Switch',
link: '/en-us/docs/dev/user_doc/guide/task/switch.html',
},
+ {
+ title: 'SeaTunnel',
+ link: '/en-us/docs/dev/user_doc/guide/task/seatunnel.html',
+ },
{
title: 'Amazon EMR',
link: '/en-us/docs/dev/user_doc/guide/task/emr.html',
@@ -529,6 +533,10 @@ export default {
title: 'Switch',
link: '/zh-cn/docs/dev/user_doc/guide/task/switch.html',
},
+ {
+ title: 'SeaTunnel',
+ link: '/zh-cn/docs/dev/user_doc/guide/task/seatunnel.html',
+ },
{
title: 'Amazon EMR',
link: '/zh-cn/docs/dev/user_doc/guide/task/emr.html',
diff --git a/docs/docs/en/guide/task/seatunnel.md b/docs/docs/en/guide/task/seatunnel.md
new file mode 100644
index 0000000000..e748360665
--- /dev/null
+++ b/docs/docs/en/guide/task/seatunnel.md
@@ -0,0 +1,82 @@
+# Apache SeaTunnel
+
+## Overview
+
+`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` or `start-seatunnel-flink.sh` command.
+Click [here](https://seatunnel.apache.org/) for more information about `Apache SeaTunnel`.
+
+## Create Task
+
+- Click Project Management -> Project Name -> Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
+- Drag the from the toolbar to the drawing board.
+
+## Task Parameter
+
+- Node name: The node name in a workflow definition is unique.
+- Run flag: Identifies whether this node can be scheduled normally, if it does not need to be executed, you can turn on the prohibition switch.
+- Descriptive information: describe the function of the node.
+- Task priority: When the number of worker threads is insufficient, they are executed in order from high to low, and when the priority is the same, they are executed according to the first-in first-out principle.
+- Worker grouping: Tasks are assigned to the machines of the worker group to execute. If Default is selected, a worker machine will be randomly selected for execution.
+- Environment Name: Configure the environment name in which to run the script.
+- Number of failed retry attempts: The number of times the task failed to be resubmitted.
+- Failed retry interval: The time, in cents, interval for resubmitting the task after a failed task.
+- Cpu quota: Assign the specified CPU time quota to the task executed. Takes a percentage value. Default -1 means unlimited. For example, the full CPU load of one core is 100%,and that of 16 cores is 1600%. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Max memory:Assign the specified max memory to the task executed. Exceeding this limit will trigger oom to be killed and will not automatically retry. Takes an MB value. Default -1 means unlimited. This function is controlled by [task.resource.limit.state](../../architecture/configuration.md)
+- Delayed execution time: The time, in cents, that a task is delayed in execution.
+- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail.
+- Engine: Supports FLINK and SPARK
+ - FLINK
+ - Run model: supports `run` and `run-application` modes
+ - Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel`
+ - SPARK
+ - Deployment mode: specify the deployment mode, `cluster` `client` `local`
+ - Master: Specify the `Master` model, `yarn` `local` `spark` `mesos`, where `spark` and `mesos` need to specify the `Master` service address, for example: 127.0.0.1:7077
+ > Click [here](https://seatunnel.apache.org/docs/2.1.2/command/usage) for more information on the usage of `Apache SeaTunnel command`
+- Custom Configuration: Supports custom configuration or select configuration file from Resource Center
+ > Click [here](https://seatunnel.apache.org/docs/2.1.2/concept/config) for more information about `Apache SeaTunnel config` file
+- Script: Customize configuration information on the task node, including four parts: `env` `source` `transform` `sink`
+- Resource file: The configuration file of the resource center can be referenced in the task node, and only one configuration file can be referenced.
+- Predecessor task: Selecting a predecessor task for the current task will set the selected predecessor task as upstream of the current task.
+
+## Task Example
+
+This sample demonstrates using the Flink engine to read data from a Fake source and print to the console.
+
+### Configuring the SeaTunnel environment in DolphinScheduler
+
+If you want to use the SeaTunnel task type in the production environment, you need to configure the required environment first. The configuration file is as follows: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
+
+![seatunnel_task01](../../../../img/tasks/demo/seatunnel_task01.png)
+
+### Configuring SeaTunnel Task Node
+
+According to the above parameter description, configure the required content.
+
+![seatunnel_task02](../../../../img/tasks/demo/seatunnel_task02.png)
+
+### Config example
+
+```Config
+
+env {
+ execution.parallelism = 1
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+}
+
+sink {
+ ConsoleSink {}
+}
+
+```
diff --git a/docs/docs/zh/guide/task/seatunnel.md b/docs/docs/zh/guide/task/seatunnel.md
new file mode 100644
index 0000000000..c15844559a
--- /dev/null
+++ b/docs/docs/zh/guide/task/seatunnel.md
@@ -0,0 +1,82 @@
+# Apache SeaTunnel
+
+## 综述
+
+`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh` 或 `start-seatunnel-flink.sh` 命令解析 config 文件。
+点击 [这里](https://seatunnel.apache.org/) 获取更多关于 `Apache SeaTunnel` 的信息。
+
+## 创建任务
+
+- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的 任务节点到画板中。
+
+## 任务参数
+
+- 节点名称:设置任务节点的名称。一个工作流定义中的节点名称是唯一的。
+- 运行标志:标识这个结点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- 描述:描述该节点的功能。
+- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- Worker 分组:任务分配给 worker 组的机器执行,选择 Default ,会随机选择一台 worker 机执行。
+- 环境名称:配置运行脚本的环境。
+- 失败重试次数:任务失败重新提交的次数。
+- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
+- Cpu 配额: 为执行的任务分配指定的CPU时间配额,单位百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 最大内存:为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。这个功能由 [task.resource.limit.state](../../architecture/configuration.md) 控制
+- 延时执行时间:任务延迟执行的时间,以分为单位。
+- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
+- 引擎:支持 FLINK 和 SPARK
+ - FLINK
+ - 运行模型:支持 `run` 和 `run-application` 两种模式
+ - 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel`
+ - SPARK
+ - 部署方式:指定部署模式,`cluster` `client` `local`
+ - Master:指定 `Master` 模型,`yarn` `local` `spark` `mesos`,其中 `spark` 和 `mesos` 需要指定 `Master` 服务地址,例如:127.0.0.1:7077
+ > 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
+- 自定义配置:支持自定义配置或从资源中心选择配置文件
+ > 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/concept/config) 获取更多关于`Apache SeaTunnel config` 文件介绍
+- 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
+- 资源文件:在任务节点引用资源中心的配置文件,只可以引用一个配置文件。
+- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+## 任务样例
+
+该样例演示为使用 Flink 引擎从 Fake 源读取数据打印到控制台。
+
+### 在 DolphinScheduler 中配置 SeaTunnel 环境
+
+若生产环境中要是使用到 SeaTunnel 任务类型,则需要先配置好所需的环境,配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
+
+![seatunnel_task01](../../../../img/tasks/demo/seatunnel_task01.png)
+
+### 配置 SeaTunnel 任务节点
+
+根据上述参数说明,配置所需的内容即可。
+
+![seatunnel_task02](../../../../img/tasks/demo/seatunnel_task02.png)
+
+### Config 样例
+
+```Config
+
+env {
+ execution.parallelism = 1
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from fake"
+ }
+}
+
+sink {
+ ConsoleSink {}
+}
+
+```
diff --git a/docs/img/tasks/demo/seatunnel_task01.png b/docs/img/tasks/demo/seatunnel_task01.png
new file mode 100644
index 0000000000..d802b95159
Binary files /dev/null and b/docs/img/tasks/demo/seatunnel_task01.png differ
diff --git a/docs/img/tasks/demo/seatunnel_task02.png b/docs/img/tasks/demo/seatunnel_task02.png
new file mode 100644
index 0000000000..0607b06318
Binary files /dev/null and b/docs/img/tasks/demo/seatunnel_task02.png differ
diff --git a/docs/img/tasks/icons/seatunnel.png b/docs/img/tasks/icons/seatunnel.png
new file mode 100644
index 0000000000..4b11920c5f
Binary files /dev/null and b/docs/img/tasks/icons/seatunnel.png differ
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
new file mode 100644
index 0000000000..1ad25cc2cf
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel;
+
+public class Constants {
+
+ private Constants() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static final String CONFIG_OPTIONS = "--config";
+ public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode";
+ public static final String MASTER_OPTIONS = "--master";
+ public static final String QUEUE_OPTIONS = "--queue";
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java
new file mode 100644
index 0000000000..28fb7685ba
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel;
+
+public enum DeployModeEnum {
+ cluster("cluster"),
+ client("client"),
+ local("client");
+
+ private String command;
+
+ DeployModeEnum(String command) {
+ this.command = command;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java
new file mode 100644
index 0000000000..14fc608049
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel;
+
+public enum EngineEnum {
+
+ FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
+ SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
+
+ private String command;
+
+ EngineEnum(String command) {
+ this.command = command;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java
index 95e5b5047c..38591ba8a4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java
@@ -19,14 +19,30 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkParameters;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkParameters;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.List;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "engine")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = SeatunnelFlinkParameters.class, name = "FLINK"),
+ @JsonSubTypes.Type(value = SeatunnelSparkParameters.class, name = "SPARK")
+})
public class SeatunnelParameters extends AbstractParameters {
- /**
- * shell script
- */
+ private EngineEnum engine;
+
+ private Boolean useCustom;
+
private String rawScript;
/**
@@ -34,6 +50,22 @@ public class SeatunnelParameters extends AbstractParameters {
*/
private List resourceList;
+ public EngineEnum getEngine() {
+ return engine;
+ }
+
+ public void setEngine(EngineEnum engine) {
+ this.engine = engine;
+ }
+
+ public Boolean getUseCustom() {
+ return useCustom;
+ }
+
+ public void setUseCustom(Boolean useCustom) {
+ this.useCustom = useCustom;
+ }
+
public String getRawScript() {
return rawScript;
}
@@ -52,7 +84,9 @@ public class SeatunnelParameters extends AbstractParameters {
@Override
public boolean checkParameters() {
- return rawScript != null && !rawScript.isEmpty();
+ return Objects.nonNull(engine)
+ && ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
+ || (BooleanUtils.isFalse(useCustom) && CollectionUtils.isNotEmpty(resourceList) && resourceList.size() == 1));
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index 2ba758d5ad..ffec399565 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
@@ -28,21 +28,20 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.SystemUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* seatunnel task
@@ -62,7 +61,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
/**
* taskExecutionContext
*/
- private TaskExecutionContext taskExecutionContext;
+ protected final TaskExecutionContext taskExecutionContext;
/**
* constructor
@@ -80,12 +79,9 @@ public class SeatunnelTask extends AbstractTaskExecutor {
@Override
public void init() {
- logger.info("seatunnel task params {}", taskExecutionContext.getTaskParams());
-
- seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelParameters.class);
-
+ logger.info("SeaTunnel task params {}", taskExecutionContext.getTaskParams());
if (!seatunnelParameters.checkParameters()) {
- throw new RuntimeException("seatunnel task params is not valid");
+ throw new RuntimeException("SeaTunnel task params is not valid");
}
}
@@ -100,7 +96,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
- logger.error("seatunnel task error", e);
+ logger.error("SeaTunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
@@ -112,43 +108,61 @@ public class SeatunnelTask extends AbstractTaskExecutor {
shellCommandExecutor.cancelApplication();
}
- /**
- * create command
- *
- * @return file name
- * @throws Exception exception
- */
private String buildCommand() throws Exception {
- // generate scripts
- String fileName = String.format("%s/%s_node.%s",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
- Path path = new File(fileName).toPath();
+ List args = new ArrayList<>();
+ args.add(seatunnelParameters.getEngine().getCommand());
+ args.addAll(buildOptions());
+
+ String command = String.join(" ", args);
+ logger.info("SeaTunnel Flink task command: {}", command);
- if (Files.exists(path)) {
- return fileName;
+ return command;
+ }
+
+ protected List buildOptions() throws Exception {
+ List args = new ArrayList<>();
+ if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
+ args.add(CONFIG_OPTIONS);
+ args.add(buildCustomConfigCommand());
+ } else {
+ seatunnelParameters.getResourceList().forEach(resourceInfo -> {
+ args.add(CONFIG_OPTIONS);
+ // TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized
+ args.add(resourceInfo.getResourceName().substring(1));
+ });
}
+ return args;
+ }
+
+ protected String buildCustomConfigCommand() throws Exception {
+ String config = buildCustomConfigContent();
+ String filePath = buildConfigFilePath();
+ createConfigFileIfNotExists(config, filePath);
+ return filePath;
+ }
+
+ private String buildCustomConfigContent() {
+ logger.info("raw custom config content : {}", seatunnelParameters.getRawScript());
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
- seatunnelParameters.setRawScript(script);
-
- logger.info("raw script : {}", seatunnelParameters.getRawScript());
- logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
+ return script;
+ }
- Set perms = PosixFilePermissions.fromString(RWXR_XR_X);
- FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms);
+ private String buildConfigFilePath() {
+ return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+ }
- if (SystemUtils.IS_OS_WINDOWS) {
- Files.createFile(path);
- } else {
- Files.createFile(path, attr);
- }
+ private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
+ logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
- Files.write(path, seatunnelParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
+ if (!Files.exists(Paths.get(scriptFile))) {
+ logger.info("generate script file:{}", scriptFile);
- return fileName;
+ // write data to file
+ FileUtils.writeStringToFile(new File(scriptFile), script, StandardCharsets.UTF_8);
+ }
}
@Override
@@ -161,4 +175,8 @@ public class SeatunnelTask extends AbstractTaskExecutor {
Map paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
+
+ public void setSeatunnelParameters(SeatunnelParameters seatunnelParameters) {
+ this.seatunnelParameters = seatunnelParameters;
+ }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java
index 4981a90388..b91c4f1228 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java
@@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkTask;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkTask;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class SeatunnelTaskChannel implements TaskChannel {
@@ -33,7 +35,13 @@ public class SeatunnelTaskChannel implements TaskChannel {
@Override
public SeatunnelTask createTask(TaskExecutionContext taskRequest) {
- return new SeatunnelTask(taskRequest);
+ SeatunnelParameters seatunnelParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class);
+ if (EngineEnum.FLINK == seatunnelParameters.getEngine()) {
+ return new SeatunnelFlinkTask(taskRequest);
+ } else if (EngineEnum.SPARK == seatunnelParameters.getEngine()) {
+ return new SeatunnelSparkTask(taskRequest);
+ }
+ throw new IllegalArgumentException("Unsupported engine type:" + seatunnelParameters.getEngine());
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkParameters.java
new file mode 100644
index 0000000000..3f0b0eee40
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkParameters.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.flink;
+
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
+
+public class SeatunnelFlinkParameters extends SeatunnelParameters {
+
+ private RunModeEnum runMode;
+
+ private String others;
+
+ public static enum RunModeEnum {
+ RUN("--run-mode run"),
+ RUN_APPLICATION("--run-mode run-application");
+
+ private String command;
+
+ RunModeEnum(String command) {
+ this.command = command;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+ }
+
+ public RunModeEnum getRunMode() {
+ return runMode;
+ }
+
+ public void setRunMode(RunModeEnum runMode) {
+ this.runMode = runMode;
+ }
+
+ public String getOthers() {
+ return others;
+ }
+
+ public void setOthers(String others) {
+ this.others = others;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
new file mode 100644
index 0000000000..efaa135334
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.flink;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+
+public class SeatunnelFlinkTask extends SeatunnelTask {
+
+ private SeatunnelFlinkParameters seatunnelParameters;
+ public SeatunnelFlinkTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ }
+
+ @Override
+ public void init() {
+ seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelFlinkParameters.class);
+ setSeatunnelParameters(seatunnelParameters);
+ super.init();
+ }
+
+ @Override
+ public List buildOptions() throws Exception {
+ List args = super.buildOptions();
+ args.add(Objects.isNull(seatunnelParameters.getRunMode()) ? SeatunnelFlinkParameters.RunModeEnum.RUN.getCommand() : seatunnelParameters.getRunMode().getCommand());
+ if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) {
+ args.add(seatunnelParameters.getOthers());
+ }
+ return args;
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkParameters.java
new file mode 100644
index 0000000000..1cc5285f39
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkParameters.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.spark;
+
+import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Objects;
+
+public class SeatunnelSparkParameters extends SeatunnelParameters {
+
+ private DeployModeEnum deployMode;
+ private MasterTypeEnum master;
+ private String masterUrl;
+ private String queue;
+
+ @Override
+ public boolean checkParameters() {
+ return super.checkParameters()
+ && Objects.nonNull(deployMode)
+ && (DeployModeEnum.local != deployMode && Objects.nonNull(master))
+ && (DeployModeEnum.local != deployMode && (MasterTypeEnum.SPARK == master || MasterTypeEnum.MESOS == master) && StringUtils.isNotBlank(masterUrl));
+ }
+
+ public static enum MasterTypeEnum {
+ YARN("yarn"),
+ LOCAL("local"),
+ SPARK("spark://"),
+ MESOS("mesos://");
+
+ private String command;
+
+ MasterTypeEnum(String command) {
+ this.command = command;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+ }
+
+ public DeployModeEnum getDeployMode() {
+ return deployMode;
+ }
+
+ public void setDeployMode(DeployModeEnum deployMode) {
+ this.deployMode = deployMode;
+ }
+
+ public MasterTypeEnum getMaster() {
+ return master;
+ }
+
+ public void setMaster(MasterTypeEnum master) {
+ this.master = master;
+ }
+
+ public String getMasterUrl() {
+ return masterUrl;
+ }
+
+ public void setMasterUrl(String masterUrl) {
+ this.masterUrl = masterUrl;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
new file mode 100644
index 0000000000..842dd05cbf
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.seatunnel.spark;
+
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.DEPLOY_MODE_OPTIONS;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.MASTER_OPTIONS;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.QUEUE_OPTIONS;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask;
+import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkParameters.MasterTypeEnum;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+public class SeatunnelSparkTask extends SeatunnelTask {
+
+ private SeatunnelSparkParameters seatunnelParameters;
+ /**
+ * constructor
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ public SeatunnelSparkTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ }
+
+ @Override
+ public void init() {
+ seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelSparkParameters.class);
+ setSeatunnelParameters(seatunnelParameters);
+ super.init();
+ }
+
+ @Override
+ public List buildOptions() throws Exception {
+ List args = super.buildOptions();
+ args.add(DEPLOY_MODE_OPTIONS);
+ args.add(seatunnelParameters.getDeployMode().getCommand());
+
+ MasterTypeEnum master = DeployModeEnum.local == seatunnelParameters.getDeployMode() ? MasterTypeEnum.LOCAL : seatunnelParameters.getMaster();
+
+ args.add(MASTER_OPTIONS);
+ args.add(master.getCommand());
+ if (MasterTypeEnum.SPARK.equals(master) || MasterTypeEnum.MESOS.equals(master)) {
+ args.add(seatunnelParameters.getMasterUrl());
+ }
+
+ if (StringUtils.isNotBlank(seatunnelParameters.getQueue())) {
+ args.add(QUEUE_OPTIONS);
+ args.add(seatunnelParameters.getQueue());
+ }
+ return args;
+ }
+}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index b5514c8a75..cf2cef1271 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -352,6 +352,7 @@ export default {
init_script_tips: 'Please enter initialization script',
resources: 'Resources',
resources_tips: 'Please select resources',
+ resources_limit_tips: 'Please select again, resource limit:',
non_resources_tips: 'Please delete all non-existent resources',
useless_resources_tips: 'Unauthorized or deleted resources',
custom_parameters: 'Custom Parameters',
@@ -707,6 +708,10 @@ export default {
please_enter_threshold_number_is_needed:
'Please enter threshold number is needed',
please_enter_comparison_title: 'please select comparison title',
+ custom_config: 'Custom Config',
+ engine: 'engine',
+ engine_tips: 'Please select engine',
+ run_mode: 'Run Mode',
dinky_address: 'Dinky address',
dinky_address_tips: 'Please enter the url of your dinky',
dinky_task_id: 'Dinky task id',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 1def17bf06..ab2c9c71f4 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -348,6 +348,7 @@ export default {
init_script_tips: '请输入初始化脚本',
resources: '资源',
resources_tips: '请选择资源',
+ resources_limit_tips: '请重新选择,资源个数限制:',
no_resources_tips: '请删除所有未授权或已删除资源',
useless_resources_tips: '未授权或已删除资源',
custom_parameters: '自定义参数',
@@ -693,6 +694,10 @@ export default {
please_enter_column_only_single_column_is_supported: '请选择源表检测列',
please_enter_threshold_number_is_needed: '请输入阈值',
please_enter_comparison_title: '请选择期望值类型',
+ custom_config: '自定义配置',
+ engine: '引擎',
+ engine_tips: '请选择引擎',
+ run_mode: '运行模式',
dinky_address: 'dinky 地址',
dinky_address_tips: '请输入 Dinky 地址',
dinky_task_id: 'dinky 作业ID',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts
index a767d084c0..633523d830 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { ref, watchEffect } from 'vue'
+import { Ref, ref, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n'
import type { IJsonItem, IOption } from '../types'
export function useDeployMode(
- span = 24,
+ span: number | Ref = 24,
showClient = ref(true),
showCluster = ref(true)
): IJsonItem {
@@ -44,7 +44,7 @@ export function useDeployMode(
field: 'deployMode',
name: t('project.node.deploy_mode'),
options: deployModeOptions,
- span
+ span: span
}
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resources.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resources.ts
index b9df51eefa..a9c979d3cc 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resources.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-resources.ts
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-import { ref, onMounted } from 'vue'
+import { ref, onMounted, Ref } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceList } from '@/service/modules/resources'
import { useTaskNodeStore } from '@/store/project/task-node'
import utils from '@/utils'
import type { IJsonItem, IResource } from '../types'
-export function useResources(): IJsonItem {
+export function useResources(
+ span: number | Ref = 24,
+ required = false,
+ limit: number | Ref = -1
+): IJsonItem {
const { t } = useI18n()
const resourcesOptions = ref([] as IResource[])
@@ -52,6 +56,7 @@ export function useResources(): IJsonItem {
type: 'tree-select',
field: 'resourceList',
name: t('project.node.resources'),
+ span: span,
options: resourcesOptions,
props: {
multiple: true,
@@ -63,6 +68,21 @@ export function useResources(): IJsonItem {
keyField: 'id',
labelField: 'name',
loading: resourcesLoading
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: required,
+ validator(validate: any, value: IResource[]) {
+ if (required) {
+ if (!value) {
+ return new Error(t('project.node.resources_tips'))
+ }
+
+ if (limit > 0 && value.length > limit) {
+ return new Error(t('project.node.resources_limit_tips') + limit)
+ }
+ }
+ }
}
}
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts
index d0a047372f..1c35dffa94 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { watch, computed } from 'vue'
+import { computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { useDeployMode, useResources, useCustomParams } from '.'
import type { IJsonItem } from '../types'
@@ -22,80 +22,64 @@ import type { IJsonItem } from '../types'
export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
- const masterTypeOptions = [
+ const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
+ const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24))
+ const flinkSpan = computed(() => (model.engine === 'FLINK' ? 24 : 0))
+ const deployModeSpan = computed(() => (model.engine === 'SPARK' ? 24 : 0))
+ const masterSpan = computed(() =>
+ model.engine === 'SPARK' && model.deployMode !== 'local' ? 12 : 0
+ )
+ const masterUrlSpan = computed(() =>
+ model.engine === 'SPARK' &&
+ model.deployMode !== 'local' &&
+ (model.master === 'SPARK' || model.master === 'MESOS')
+ ? 12
+ : 0
+ )
+ const queueSpan = computed(() =>
+ model.engine === 'SPARK' &&
+ model.deployMode != 'local' &&
+ model.master === 'YARN'
+ ? 24
+ : 0
+ )
+
+ return [
{
- label: 'yarn',
- value: 'yarn'
+ type: 'select',
+ field: 'engine',
+ span: 12,
+ name: t('project.node.engine'),
+ options: ENGINE,
+ validate: {
+ trigger: ['input', 'blur'],
+ required: true,
+ message: t('project.node.engine_tips')
+ }
},
+
+ // SeaTunnel flink parameter
{
- label: 'local',
- value: 'local'
+ type: 'select',
+ field: 'runMode',
+ name: t('project.node.run_mode'),
+ options: FLINK_RUN_MODE,
+ value: model.runMode,
+ span: flinkSpan
},
{
- label: 'spark://',
- value: 'spark://'
+ type: 'input',
+ field: 'others',
+ name: t('project.node.option_parameters'),
+ span: flinkSpan,
+ props: {
+ type: 'textarea',
+ placeholder: t('project.node.option_parameters_tips')
+ }
},
- {
- label: 'mesos://',
- value: 'mesos://'
- }
- ]
- const queueOptions = [
- {
- label: 'default',
- value: 'default'
- }
- ]
-
- const masterSpan = computed(() => (model.deployMode === 'local' ? 0 : 12))
- const queueSpan = computed(() =>
- model.deployMode === 'local' || model.master != 'yarn' ? 0 : 12
- )
- const masterUrlSpan = computed(() =>
- model.deployMode === 'local' ||
- (model.master != 'spark://' && model.master != 'mesos://')
- ? 0
- : 12
- )
-
- const baseScript = 'sh ${WATERDROP_HOME}/bin/start-waterdrop.sh'
-
- const parseRawScript = () => {
- if (model.rawScript) {
- model.rawScript.split('\n').forEach((script: string) => {
- const params = script.replace(baseScript, '').split('--')
- params?.forEach((param: string) => {
- const pair = param.split(' ')
- if (pair && pair.length >= 2) {
- if (pair[0] === 'master') {
- const prefix = pair[1].substring(0, 8)
- if (pair[1] && (prefix === 'mesos://' || prefix === 'spark://')) {
- model.master = prefix
- model.masterUrl = pair[1].substring(8, pair[1].length)
- } else {
- model.master = pair[1]
- }
- } else if (pair[0] === 'deploy-mode') {
- model.deployMode = pair[1]
- } else if (pair[0] === 'queue') {
- model.queue = pair[1]
- }
- }
- })
- })
- }
- }
-
- watch(
- () => model.rawScript,
- () => {
- parseRawScript()
- }
- )
-
- return [
- useDeployMode(),
+ // SeaTunnel spark parameter
+ useDeployMode(deployModeSpan),
{
type: 'select',
field: 'master',
@@ -112,17 +96,88 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
span: masterUrlSpan,
props: {
placeholder: t('project.node.sea_tunnel_master_url_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: masterUrlSpan.value !== 0,
+ validator(validate: any, value: string) {
+ if (masterUrlSpan.value !== 0 && !value) {
+ return new Error(t('project.node.sea_tunnel_master_url_tips'))
+ }
+ }
}
},
{
- type: 'select',
+ type: 'input',
field: 'queue',
name: t('project.node.sea_tunnel_queue'),
- options: queueOptions,
value: model.queue,
span: queueSpan
},
- useResources(),
+
+ // SeaTunnel config parameter
+ {
+ type: 'switch',
+ field: 'useCustom',
+ name: t('project.node.custom_config')
+ },
+ {
+ type: 'editor',
+ field: 'rawScript',
+ name: t('project.node.script'),
+ span: configEditorSpan,
+ validate: {
+ trigger: ['input', 'trigger'],
+ required: model.useCustom,
+ validator(validate: any, value: string) {
+ if (model.useCustom && !value) {
+ return new Error(t('project.node.script_tips'))
+ }
+ }
+ }
+ },
+ useResources(resourceEditorSpan, true, 1),
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
+
+export const ENGINE = [
+ {
+ label: 'SPARK',
+ value: 'SPARK'
+ },
+ {
+ label: 'FLINK',
+ value: 'FLINK'
+ }
+]
+
+export const FLINK_RUN_MODE = [
+ {
+ label: 'run',
+ value: 'RUN'
+ },
+ {
+ label: 'run-application',
+ value: 'RUN_APPLICATION'
+ }
+]
+
+export const masterTypeOptions = [
+ {
+ label: 'yarn',
+ value: 'YARN'
+ },
+ {
+ label: 'local',
+ value: 'LOCAL'
+ },
+ {
+ label: 'spark://',
+ value: 'SPARK'
+ },
+ {
+ label: 'mesos://',
+ value: 'MESOS'
+ }
+]
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index aea0097295..c90d1660db 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-import { find, omit, cloneDeep } from 'lodash'
+import { omit, cloneDeep } from 'lodash'
import type {
INodeData,
ITaskData,
@@ -199,12 +199,23 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'SEATUNNEL') {
- if (data.deployMode === 'local') {
- data.master = 'local'
- data.masterUrl = ''
- data.deployMode = 'client'
+ taskParams.engine = data.engine
+ taskParams.useCustom = data.useCustom
+ taskParams.rawScript = data.rawScript
+ switch (data.engine) {
+ case 'FLINK':
+ taskParams.runMode = data.runMode
+ taskParams.others = data.others
+ break
+ case 'SPARK':
+ taskParams.deployMode = data.deployMode
+ taskParams.master = data.master
+ taskParams.masterUrl = data.masterUrl
+ taskParams.queue = data.queue
+ break
+ default:
+ break
}
- buildRawScript(data)
}
if (data.taskType === 'SWITCH') {
@@ -623,49 +634,3 @@ export function formatModel(data: ITaskData) {
}
return params
}
-
-const buildRawScript = (model: INodeData) => {
- const baseScript = 'sh ${WATERDROP_HOME}/bin/start-waterdrop.sh'
- if (!model.resourceList) return
-
- let master = model.master
- let masterUrl = model?.masterUrl ? model?.masterUrl : ''
- let deployMode = model.deployMode
- const queue = model.queue
-
- if (model.deployMode === 'local') {
- master = 'local'
- masterUrl = ''
- deployMode = 'client'
- }
-
- if (master === 'yarn' || master === 'local') {
- masterUrl = ''
- }
-
- let localParams = ''
- model?.localParams?.forEach((param: any) => {
- localParams = localParams + ' --variable ' + param.prop + '=' + param.value
- })
-
- let rawScript = ''
- model.resourceList?.forEach((id: number) => {
- const item = find(model.resourceFiles, { id: id })
-
- rawScript =
- rawScript +
- baseScript +
- ' --master ' +
- master +
- masterUrl +
- ' --deploy-mode ' +
- deployMode +
- ' --queue ' +
- queue
- if (item && item.fullName) {
- rawScript = rawScript + ' --config ' + item.fullName
- }
- rawScript = rawScript + localParams + ' \n'
- })
- model.rawScript = rawScript ? rawScript : ''
-}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
index a994d217ef..c5fd116ec0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
@@ -46,12 +46,36 @@ export function useSeaTunnel({
memoryMax: -1,
delayTime: 0,
timeout: 30,
+ engine: 'FLINK',
+ runMode: 'RUN',
+ useCustom: true,
deployMode: 'client',
queue: 'default',
- master: 'yarn',
+ master: 'YARN',
masterUrl: '',
resourceFiles: [],
- timeoutNotifyStrategy: ['WARN']
+ timeoutNotifyStrategy: ['WARN'],
+ rawScript:
+ 'env {\n' +
+ ' execution.parallelism = 1\n' +
+ '}\n' +
+ '\n' +
+ 'source {\n' +
+ ' FakeSourceStream {\n' +
+ ' result_table_name = "fake"\n' +
+ ' field_name = "name,age"\n' +
+ ' }\n' +
+ '}\n' +
+ '\n' +
+ 'transform {\n' +
+ ' sql {\n' +
+ ' sql = "select name,age from fake"\n' +
+ ' }\n' +
+ '}\n' +
+ '\n' +
+ 'sink {\n' +
+ ' ConsoleSink {}\n' +
+ '}'
} as INodeData)
let extra: IJsonItem[] = []
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 06a5c4adfe..5019f29ce1 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -272,6 +272,7 @@ interface ITaskParams {
sourceParams?: string
queue?: string
master?: string
+ masterUrl?: string
switchResult?: ISwitchResult
dependTaskList?: IDependTask[]
nextNode?: number
@@ -341,6 +342,8 @@ interface ITaskParams {
zk?: string
zkPath?: string
executeMode?: string
+ useCustom?: boolean
+ runMode?: string
dvcTaskType?: string
dvcRepository?: string
dvcVersion?: string
@@ -397,7 +400,6 @@ interface INodeData
timeoutSetting?: boolean
isCustomTask?: boolean
method?: string
- masterUrl?: string
resourceFiles?: { id: number; fullName: string }[] | null
relation?: RelationType
definition?: object
diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh
index 6e9d3aa78f..44f0de7792 100755
--- a/script/env/dolphinscheduler_env.sh
+++ b/script/env/dolphinscheduler_env.sh
@@ -43,5 +43,6 @@ export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
+export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
-export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
+export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$PATH