From 4439c1ec9096bd4ae3a7e4ad7edb939992a4f7ce Mon Sep 17 00:00:00 2001 From: Assert <42203474+shangeyao@users.noreply.github.com> Date: Sat, 8 Apr 2023 22:02:19 +0800 Subject: [PATCH] [Feature-13052][Task Plugin] Support Apache Seatunnel Connector-V2 (#13086) --- docs/docs/en/guide/task/seatunnel.md | 8 ++- docs/docs/zh/guide/task/seatunnel.md | 8 ++- .../plugin/task/seatunnel/DeployModeEnum.java | 2 +- .../plugin/task/seatunnel/EngineEnum.java | 8 ++- .../task/seatunnel/SeatunnelTaskChannel.java | 14 +++-- .../self/SeatunnelEngineParameters.java | 44 ++++++++++++++ .../seatunnel/self/SeatunnelEngineTask.java | 58 +++++++++++++++++++ .../components/node/fields/use-deploy-mode.ts | 5 +- .../components/node/fields/use-sea-tunnel.ts | 29 +++++++--- 9 files changed, 156 insertions(+), 20 deletions(-) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java diff --git a/docs/docs/en/guide/task/seatunnel.md b/docs/docs/en/guide/task/seatunnel.md index 514654521f..4a3f1a6a4d 100644 --- a/docs/docs/en/guide/task/seatunnel.md +++ b/docs/docs/en/guide/task/seatunnel.md @@ -2,7 +2,7 @@ ## Overview -`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` or `start-seatunnel-flink.sh` command. +`SeaTunnel` task type for creating and executing `SeaTunnel` tasks. When the worker executes this task, it will parse the config file through the `start-seatunnel-spark.sh` , `start-seatunnel-flink.sh` or `seatunnel.sh` command. Click [here](https://seatunnel.apache.org/) for more information about `Apache SeaTunnel`. ## Create Task @@ -16,13 +16,15 @@ Click [here](https://seatunnel.apache.org/) for more information about `Apache S [//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.) - Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. -- Engine: Supports FLINK and SPARK +- Engine: Supports FLINK, SPARK AND SEATUNNEL_ENGINE - FLINK - Run model: supports `run` and `run-application` modes - Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel` - SPARK -- Deployment mode: specify the deployment mode, `cluster` `client` `local` +- Deployment mode: specify the deployment mode, `cluster` `client` - Master: Specify the `Master` model, `yarn` `local` `spark` `mesos`, where `spark` and `mesos` need to specify the `Master` service address, for example: 127.0.0.1:7077 +- SEATUNNEL_ENGINE +- Deployment mode: specify the deployment mode, `cluster` `local` > Click [here](https://seatunnel.apache.org/docs/2.1.2/command/usage) for more information on the usage of `Apache SeaTunnel command` diff --git a/docs/docs/zh/guide/task/seatunnel.md b/docs/docs/zh/guide/task/seatunnel.md index 2a7080c2b2..c041e4b166 100644 --- a/docs/docs/zh/guide/task/seatunnel.md +++ b/docs/docs/zh/guide/task/seatunnel.md @@ -2,7 +2,7 @@ ## 综述 -`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh` 或 `start-seatunnel-flink.sh` 命令解析 config 文件。 +`SeaTunnel` 任务类型,用于创建并执行 `SeaTunnel` 类型任务。worker 执行该任务的时候,会通过 `start-seatunnel-spark.sh` 、 `start-seatunnel-flink.sh` 和 `seatunnel.sh` 命令解析 config 文件。 点击 [这里](https://seatunnel.apache.org/) 获取更多关于 `Apache SeaTunnel` 的信息。 ## 创建任务 @@ -16,13 +16,15 @@ [//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。) - 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 -- 引擎:支持 FLINK 和 SPARK +- 引擎:支持 FLINK 、 SPARK 和 SEATUNNEL_ENGINE - FLINK - 运行模型:支持 `run` 和 `run-application` 两种模式 - 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel` - SPARK -- 部署方式:指定部署模式,`cluster` `client` `local` +- 部署方式:指定部署模式,`cluster` `client` - Master:指定 `Master` 模型,`yarn` `local` `spark` `mesos`,其中 `spark` 和 `mesos` 需要指定 `Master` 服务地址,例如:127.0.0.1:7077 +- SEATUNNEL_ENGINE +- 部署方式:指定部署模式,`cluster` `local` > 点击 [这里](https://seatunnel.apache.org/docs/2.1.2/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java index 0198877354..ffc331260e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java @@ -21,7 +21,7 @@ public enum DeployModeEnum { cluster("cluster"), client("client"), - local("client"); + local("local"); private String command; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java index 14fc608049..3634436191 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java @@ -20,7 +20,13 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel; public enum EngineEnum { FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"), - SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"); + SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"), + + FLINK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-connector-v2.sh"), + + SPARK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-connector-v2.sh"), + + SEATUNNEL_ENGINE("${SEATUNNEL_HOME}/bin/seatunnel.sh"); private String command; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java index 6303d97010..c1a6f9a86a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkTask; +import org.apache.dolphinscheduler.plugin.task.seatunnel.self.SeatunnelEngineTask; import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkTask; public class SeatunnelTaskChannel implements TaskChannel { @@ -37,10 +38,15 @@ public class SeatunnelTaskChannel implements TaskChannel { public SeatunnelTask createTask(TaskExecutionContext taskRequest) { SeatunnelParameters seatunnelParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class); - if (EngineEnum.FLINK == seatunnelParameters.getEngine()) { - return new SeatunnelFlinkTask(taskRequest); - } else if (EngineEnum.SPARK == seatunnelParameters.getEngine()) { - return new SeatunnelSparkTask(taskRequest); + switch (seatunnelParameters.getEngine()) { + case FLINK: + case FLINK_V2: + return new SeatunnelFlinkTask(taskRequest); + case SPARK: + case SPARK_V2: + return new SeatunnelSparkTask(taskRequest); + case SEATUNNEL_ENGINE: + return new SeatunnelEngineTask(taskRequest); } throw new IllegalArgumentException("Unsupported engine type:" + seatunnelParameters.getEngine()); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java new file mode 100644 index 0000000000..0a43dc6f7e --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.seatunnel.self; + +import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum; +import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters; + +public class SeatunnelEngineParameters extends SeatunnelParameters { + + private DeployModeEnum deployMode; + + private String others; + + public DeployModeEnum getDeployMode() { + return deployMode; + } + + public void setDeployMode(DeployModeEnum deployMode) { + this.deployMode = deployMode; + } + + public String getOthers() { + return others; + } + + public void setOthers(String others) { + this.others = others; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java new file mode 100644 index 0000000000..1507653da9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.seatunnel.self; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.seatunnel.Constants; +import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelTask; + +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Objects; + +public class SeatunnelEngineTask extends SeatunnelTask { + + private SeatunnelEngineParameters seatunnelParameters; + public SeatunnelEngineTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + } + + @Override + public void init() { + seatunnelParameters = + JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelEngineParameters.class); + setSeatunnelParameters(seatunnelParameters); + super.init(); + } + + @Override + public List buildOptions() throws Exception { + List args = super.buildOptions(); + if (!Objects.isNull(seatunnelParameters.getDeployMode())) { + args.add(Constants.DEPLOY_MODE_OPTIONS); + args.add(seatunnelParameters.getDeployMode().getCommand()); + } + if (StringUtils.isNotBlank(seatunnelParameters.getOthers())) { + args.add(seatunnelParameters.getOthers()); + } + return args; + } + +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts index 633523d830..81587b96f0 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-deploy-mode.ts @@ -21,7 +21,8 @@ import type { IJsonItem, IOption } from '../types' export function useDeployMode( span: number | Ref = 24, showClient = ref(true), - showCluster = ref(true) + showCluster = ref(true), + showLocal = ref(true) ): IJsonItem { const { t } = useI18n() @@ -34,6 +35,8 @@ export function useDeployMode( return showCluster.value case 'client': return showClient.value + case 'local': + return showLocal.value default: return true } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts index 6869f83070..4485d97aad 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { computed } from 'vue' +import { computed, ref } from 'vue' import { useI18n } from 'vue-i18n' import { useDeployMode, useResources, useCustomParams } from '.' import type { IJsonItem } from '../types' @@ -24,18 +24,21 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] { const configEditorSpan = computed(() => (model.useCustom ? 24 : 0)) const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24)) - const flinkSpan = computed(() => (model.engine === 'FLINK' ? 24 : 0)) - const deployModeSpan = computed(() => (model.engine === 'SPARK' ? 24 : 0)) + const flinkSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' ? 24 : 0)) + const deployModeSpan = computed(() => (model.engine === 'SPARK' || model.engine === 'SPARK_V2' || model.engine === "SEATUNNEL_ENGINE" ? 24 : 0)) const masterSpan = computed(() => - model.engine === 'SPARK' && model.deployMode !== 'local' ? 12 : 0 + (model.engine === 'SPARK' || model.engine === 'SPARK_V2') && model.deployMode !== 'local' ? 12 : 0 ) const masterUrlSpan = computed(() => - model.engine === 'SPARK' && + (model.engine === 'SPARK' || model.engine === 'SPARK_V2') && model.deployMode !== 'local' && (model.master === 'SPARK' || model.master === 'MESOS') ? 12 : 0 ) + const showClient = computed(() => model.engine === 'SPARK' || model.engine === 'SPARK_V2') + const showLocal = computed(() => model.engine === 'SEATUNNEL_ENGINE') + const othersSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' || model.engine === 'SEATUNNEL_ENGINE' ? 24 : 0)) return [ { @@ -64,7 +67,7 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] { type: 'input', field: 'others', name: t('project.node.option_parameters'), - span: flinkSpan, + span: othersSpan, props: { type: 'textarea', placeholder: t('project.node.option_parameters_tips') @@ -72,7 +75,7 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] { }, // SeaTunnel spark parameter - useDeployMode(deployModeSpan), + useDeployMode(deployModeSpan, showClient, ref(true), showLocal), { type: 'select', field: 'master', @@ -135,6 +138,18 @@ export const ENGINE = [ { label: 'FLINK', value: 'FLINK' + }, + { + label: 'SPARK_V2', + value: 'SPARK_V2' + }, + { + label: 'FLINK_V2', + value: 'FLINK_V2' + }, + { + label: 'SEATUNNEL_ENGINE', + value: 'SEATUNNEL_ENGINE' } ]