Browse Source

fix: Save task seatunnel error (#14129)

3.2.0-release
Jay Chung 1 year ago committed by GitHub
parent
commit
56b0f91f6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/guide/task/seatunnel.md
  2. 2
      docs/docs/zh/guide/task/seatunnel.md
  3. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
  4. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java
  5. 40
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java
  6. 51
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java
  7. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  8. 25
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java
  9. 32
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkParameters.java
  10. 25
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java
  11. 40
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkParameters.java
  12. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  13. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  14. 78
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts
  15. 26
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  16. 44
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
  17. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

2
docs/docs/en/guide/task/seatunnel.md

@ -16,7 +16,7 @@ Click [here](https://seatunnel.apache.org/) for more information about `Apache S
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.) [//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.)
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. - Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
- Engine: Supports FLINK, SPARK AND SEATUNNEL_ENGINE - Startup script: Select script name to start the task, including `seatunnel.sh`, `start-seatunnel-flink-13-connector-v2.sh`, `start-seatunnel-flink-15-connector-v2.sh`, `start-seatunnel-flink-connector-v2.sh`, `start-seatunnel-flink.sh`, `start-seatunnel-spark-2-connector-v2.sh`, `start-seatunnel-spark-3-connector-v2.sh`, `start-seatunnel-spark-connector-v2.sh`, `start-seatunnel-spark.sh`
- FLINK - FLINK
- Run model: supports `run` and `run-application` modes - Run model: supports `run` and `run-application` modes
- Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel` - Option parameters: used to add the parameters of the Flink engine, such as `-m yarn-cluster -ynm seatunnel`

2
docs/docs/zh/guide/task/seatunnel.md

@ -16,7 +16,7 @@
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。) [//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 - 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
- 引擎:支持 FLINK 、 SPARK 和 SEATUNNEL_ENGINE - 启动脚本:选择你想要运行任务的启动脚本,包括 `seatunnel.sh`, `start-seatunnel-flink-13-connector-v2.sh`, `start-seatunnel-flink-15-connector-v2.sh`, `start-seatunnel-flink-connector-v2.sh`, `start-seatunnel-flink.sh`, `start-seatunnel-spark-2-connector-v2.sh`, `start-seatunnel-spark-3-connector-v2.sh`, `start-seatunnel-spark-connector-v2.sh`, `start-seatunnel-spark.sh`
- FLINK - FLINK
- 运行模型:支持 `run``run-application` 两种模式 - 运行模型:支持 `run``run-application` 两种模式
- 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel` - 选项参数:用于添加 Flink 引擎本身参数,例如 `-m yarn-cluster -ynm seatunnel`

4
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java

@ -26,6 +26,8 @@ public class Constants {
public static final String CONFIG_OPTIONS = "--config"; public static final String CONFIG_OPTIONS = "--config";
public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode"; public static final String DEPLOY_MODE_OPTIONS = "--deploy-mode";
public static final String MASTER_OPTIONS = "--master"; public static final String MASTER_OPTIONS = "--master";
public static final String QUEUE_OPTIONS = "--queue"; public static final String STARTUP_SCRIPT_SPARK = "spark";
public static final String STARTUP_SCRIPT_FLINK = "flink";
public static final String STARTUP_SCRIPT_SEATUNNEL = "seatunnel";
} }

10
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/DeployModeEnum.java

@ -17,19 +17,19 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel; package org.apache.dolphinscheduler.plugin.task.seatunnel;
import lombok.Getter;
@Getter
public enum DeployModeEnum { public enum DeployModeEnum {
// TODO: use upper case, have to change both frontend and backend code, currently frontend use lower case
cluster("cluster"), cluster("cluster"),
client("client"), client("client"),
local("local"); local("local");
private String command; private final String command;
DeployModeEnum(String command) { DeployModeEnum(String command) {
this.command = command; this.command = command;
} }
public String getCommand() {
return command;
}
} }

40
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/EngineEnum.java

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
public enum EngineEnum {
FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh"),
FLINK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-connector-v2.sh"),
SPARK_V2("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-connector-v2.sh"),
SEATUNNEL_ENGINE("${SEATUNNEL_HOME}/bin/seatunnel.sh");
private String command;
EngineEnum(String command) {
this.command = command;
}
public String getCommand() {
return command;
}
}

51
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java

@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.flink.SeatunnelFlinkParameters;
import org.apache.dolphinscheduler.plugin.task.seatunnel.spark.SeatunnelSparkParameters;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.BooleanUtils;
@ -29,17 +27,16 @@ import org.apache.commons.lang3.StringUtils;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonSubTypes; import lombok.Getter;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.NoArgsConstructor;
import lombok.Setter;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "engine") @Getter
@JsonSubTypes({ @Setter
@JsonSubTypes.Type(value = SeatunnelFlinkParameters.class, name = "FLINK"), @NoArgsConstructor
@JsonSubTypes.Type(value = SeatunnelSparkParameters.class, name = "SPARK")
})
public class SeatunnelParameters extends AbstractParameters { public class SeatunnelParameters extends AbstractParameters {
private EngineEnum engine; private String startupScript;
private Boolean useCustom; private Boolean useCustom;
@ -50,41 +47,9 @@ public class SeatunnelParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
public EngineEnum getEngine() {
return engine;
}
public void setEngine(EngineEnum engine) {
this.engine = engine;
}
public Boolean getUseCustom() {
return useCustom;
}
public void setUseCustom(Boolean useCustom) {
this.useCustom = useCustom;
}
public String getRawScript() {
return rawScript;
}
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return Objects.nonNull(engine) return Objects.nonNull(startupScript)
&& ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript)) && ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
|| (BooleanUtils.isFalse(useCustom) && CollectionUtils.isNotEmpty(resourceList) || (BooleanUtils.isFalse(useCustom) && CollectionUtils.isNotEmpty(resourceList)
&& resourceList.size() == 1)); && resourceList.size() == 1));

4
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -51,6 +51,8 @@ import java.util.Map;
*/ */
public class SeatunnelTask extends AbstractRemoteTask { public class SeatunnelTask extends AbstractRemoteTask {
private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
/** /**
* seatunnel parameters * seatunnel parameters
*/ */
@ -139,7 +141,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
private String buildCommand() throws Exception { private String buildCommand() throws Exception {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
args.add(seatunnelParameters.getEngine().getCommand()); args.add(SEATUNNEL_BIN_DIR + seatunnelParameters.getStartupScript());
args.addAll(buildOptions()); args.addAll(buildOptions());
String command = String.join(" ", args); String command = String.join(" ", args);

25
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java

@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel; package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.STARTUP_SCRIPT_FLINK;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.STARTUP_SCRIPT_SEATUNNEL;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.STARTUP_SCRIPT_SPARK;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -38,17 +42,18 @@ public class SeatunnelTaskChannel implements TaskChannel {
public SeatunnelTask createTask(TaskExecutionContext taskRequest) { public SeatunnelTask createTask(TaskExecutionContext taskRequest) {
SeatunnelParameters seatunnelParameters = SeatunnelParameters seatunnelParameters =
JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class); JSONUtils.parseObject(taskRequest.getTaskParams(), SeatunnelParameters.class);
switch (seatunnelParameters.getEngine()) { assert seatunnelParameters != null;
case FLINK: String startupScript = seatunnelParameters.getStartupScript();
case FLINK_V2: if (startupScript.contains(STARTUP_SCRIPT_SPARK)) {
return new SeatunnelFlinkTask(taskRequest); return new SeatunnelSparkTask(taskRequest);
case SPARK: }
case SPARK_V2: if (startupScript.contains(STARTUP_SCRIPT_FLINK)) {
return new SeatunnelSparkTask(taskRequest); return new SeatunnelFlinkTask(taskRequest);
case SEATUNNEL_ENGINE: }
return new SeatunnelEngineTask(taskRequest); if (startupScript.contains(STARTUP_SCRIPT_SEATUNNEL)) {
return new SeatunnelEngineTask(taskRequest);
} }
throw new IllegalArgumentException("Unsupported engine type:" + seatunnelParameters.getEngine()); throw new IllegalArgumentException("Unsupported startup script name:" + seatunnelParameters.getStartupScript());
} }
@Override @Override

32
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/flink/SeatunnelFlinkParameters.java

@ -19,41 +19,29 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel.flink;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters; import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
public class SeatunnelFlinkParameters extends SeatunnelParameters { public class SeatunnelFlinkParameters extends SeatunnelParameters {
private RunModeEnum runMode; private RunModeEnum runMode;
private String others; private String others;
public static enum RunModeEnum { @Getter
public enum RunModeEnum {
RUN("--run-mode run"), RUN("--run-mode run"),
RUN_APPLICATION("--run-mode run-application"); RUN_APPLICATION("--run-mode run-application");
private String command; private final String command;
RunModeEnum(String command) { RunModeEnum(String command) {
this.command = command; this.command = command;
} }
public String getCommand() {
return command;
}
}
public RunModeEnum getRunMode() {
return runMode;
}
public void setRunMode(RunModeEnum runMode) {
this.runMode = runMode;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
} }
} }

25
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/self/SeatunnelEngineParameters.java

@ -20,25 +20,18 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel.self;
import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum; import org.apache.dolphinscheduler.plugin.task.seatunnel.DeployModeEnum;
import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters; import org.apache.dolphinscheduler.plugin.task.seatunnel.SeatunnelParameters;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class SeatunnelEngineParameters extends SeatunnelParameters { public class SeatunnelEngineParameters extends SeatunnelParameters {
private DeployModeEnum deployMode; private DeployModeEnum deployMode;
private String others; private String others;
public DeployModeEnum getDeployMode() {
return deployMode;
}
public void setDeployMode(DeployModeEnum deployMode) {
this.deployMode = deployMode;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
} }

40
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/spark/SeatunnelSparkParameters.java

@ -24,6 +24,13 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Objects; import java.util.Objects;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
public class SeatunnelSparkParameters extends SeatunnelParameters { public class SeatunnelSparkParameters extends SeatunnelParameters {
private DeployModeEnum deployMode; private DeployModeEnum deployMode;
@ -42,45 +49,18 @@ public class SeatunnelSparkParameters extends SeatunnelParameters {
return result; return result;
} }
public static enum MasterTypeEnum { @Getter
public enum MasterTypeEnum {
YARN("yarn"), YARN("yarn"),
LOCAL("local"), LOCAL("local"),
SPARK("spark://"), SPARK("spark://"),
MESOS("mesos://"); MESOS("mesos://");
private String command; private final String command;
MasterTypeEnum(String command) { MasterTypeEnum(String command) {
this.command = command; this.command = command;
} }
public String getCommand() {
return command;
}
}
public DeployModeEnum getDeployMode() {
return deployMode;
}
public void setDeployMode(DeployModeEnum deployMode) {
this.deployMode = deployMode;
}
public MasterTypeEnum getMaster() {
return master;
}
public void setMaster(MasterTypeEnum master) {
this.master = master;
}
public String getMasterUrl() {
return masterUrl;
}
public void setMasterUrl(String masterUrl) {
this.masterUrl = masterUrl;
} }
} }

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -798,6 +798,8 @@ export default {
custom_config: 'Custom Config', custom_config: 'Custom Config',
engine: 'engine', engine: 'engine',
engine_tips: 'Please select engine', engine_tips: 'Please select engine',
startup_script: 'Startup script',
startup_script_tips: 'Please select startup script',
run_mode: 'Run Mode', run_mode: 'Run Mode',
dinky_address: 'Dinky address', dinky_address: 'Dinky address',
dinky_address_tips: 'Please enter the url of your dinky', dinky_address_tips: 'Please enter the url of your dinky',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -777,6 +777,8 @@ export default {
custom_config: '自定义配置', custom_config: '自定义配置',
engine: '引擎', engine: '引擎',
engine_tips: '请选择引擎', engine_tips: '请选择引擎',
startup_script: '启动脚本',
startup_script_tips: '请选择启动脚本',
run_mode: '运行模式', run_mode: '运行模式',
dinky_address: 'dinky 地址', dinky_address: 'dinky 地址',
dinky_address_tips: '请输入 Dinky 地址', dinky_address_tips: '请输入 Dinky 地址',

78
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sea-tunnel.ts

@ -24,34 +24,44 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
const configEditorSpan = computed(() => (model.useCustom ? 24 : 0)) const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24)) const resourceEditorSpan = computed(() => (model.useCustom ? 0 : 24))
const flinkSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' ? 24 : 0)) const flinkSpan = computed(() => (model.startupScript.includes("flink") ? 24 : 0))
const deployModeSpan = computed(() => (model.engine === 'SPARK' || model.engine === 'SPARK_V2' || model.engine === "SEATUNNEL_ENGINE" ? 24 : 0)) const deployModeSpan = computed(() => (model.startupScript.includes("spark") || model.startupScript === "seatunnel.sh" ? 24 : 0))
const masterSpan = computed(() => const masterSpan = computed(() => (model.startupScript.includes("spark")) && model.deployMode !== 'local' ? 12 : 0)
(model.engine === 'SPARK' || model.engine === 'SPARK_V2') && model.deployMode !== 'local' ? 12 : 0
)
const masterUrlSpan = computed(() => const masterUrlSpan = computed(() =>
(model.engine === 'SPARK' || model.engine === 'SPARK_V2') && (model.startupScript.includes("spark")) &&
model.deployMode !== 'local' && model.deployMode !== 'local' &&
(model.master === 'SPARK' || model.master === 'MESOS') (model.master === 'SPARK' || model.master === 'MESOS')
? 12 ? 12
: 0 : 0
) )
const showClient = computed(() => model.engine === 'SPARK' || model.engine === 'SPARK_V2') const showClient = computed(() => model.startupScript.includes("spark"))
const showLocal = computed(() => model.engine === 'SEATUNNEL_ENGINE') const showLocal = computed(() => model.startupScript === 'seatunnel.sh')
const othersSpan = computed(() => (model.engine === 'FLINK' || model.engine === 'FLINK_V2' || model.engine === 'SEATUNNEL_ENGINE' ? 24 : 0)) const othersSpan = computed(() => (model.startupScript.includes("flink") || model.startupScript === 'seatunnel.sh' ? 24 : 0))
return [ return [
{ {
type: 'select', type: 'select',
field: 'engine', field: 'startupScript',
span: 12, span: 15,
name: t('project.node.engine'), name: t('project.node.startup_script'),
options: ENGINE, options: STARTUP_SCRIPT,
validate: { validate: {
trigger: ['input', 'blur'], trigger: ['input', 'blur'],
required: true, required: true,
message: t('project.node.engine_tips') message: t('project.node.startup_script_tips')
} },
props: {
'on-update:value': (value: boolean) => {
if (value) {
if (model.startupScript === 'seatunnel.sh') {
model.deployMode = 'local'
}
if (model.startupScript.includes("spark")) {
model.deployMode = 'client'
}
}
}
},
}, },
// SeaTunnel flink parameter // SeaTunnel flink parameter
@ -130,26 +140,42 @@ export function useSeaTunnel(model: { [field: string]: any }): IJsonItem[] {
] ]
} }
export const ENGINE = [ export const STARTUP_SCRIPT = [
{ {
label: 'SPARK', label: 'seatunnel.sh',
value: 'SPARK' value: 'seatunnel.sh'
},
{
label: 'start-seatunnel-flink-13-connector-v2.sh',
value: 'start-seatunnel-flink-13-connector-v2.sh'
},
{
label: 'start-seatunnel-flink-15-connector-v2.sh',
value: 'start-seatunnel-flink-15-connector-v2.sh'
},
{
label: 'start-seatunnel-flink-connector-v2.sh',
value: 'start-seatunnel-flink-connector-v2.sh'
},
{
label: 'start-seatunnel-flink.sh',
value: 'start-seatunnel-flink.sh'
}, },
{ {
label: 'FLINK', label: 'start-seatunnel-spark-2-connector-v2.sh',
value: 'FLINK' value: 'start-seatunnel-spark-2-connector-v2.sh'
}, },
{ {
label: 'SPARK_V2', label: 'start-seatunnel-spark-3-connector-v2.sh',
value: 'SPARK_V2' value: 'start-seatunnel-spark-3-connector-v2.sh'
}, },
{ {
label: 'FLINK_V2', label: 'start-seatunnel-spark-connector-v2.sh',
value: 'FLINK_V2' value: 'start-seatunnel-spark-connector-v2.sh'
}, },
{ {
label: 'SEATUNNEL_ENGINE', label: 'start-seatunnel-spark.sh',
value: 'SEATUNNEL_ENGINE' value: 'start-seatunnel-spark.sh'
} }
] ]

26
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -216,21 +216,21 @@ export function formatParams(data: INodeData): {
} }
if (data.taskType === 'SEATUNNEL') { if (data.taskType === 'SEATUNNEL') {
taskParams.engine = data.engine taskParams.startupScript = data.startupScript
taskParams.useCustom = data.useCustom taskParams.useCustom = data.useCustom
taskParams.rawScript = data.rawScript taskParams.rawScript = data.rawScript
switch (data.engine) { if (data.startupScript?.includes("flink")) {
case 'FLINK': taskParams.runMode = data.runMode
taskParams.runMode = data.runMode taskParams.others = data.others
taskParams.others = data.others }
break if (data.startupScript?.includes("spark")) {
case 'SPARK': taskParams.deployMode = data.deployMode
taskParams.deployMode = data.deployMode taskParams.master = data.master
taskParams.master = data.master taskParams.masterUrl = data.masterUrl
taskParams.masterUrl = data.masterUrl }
break if (data.startupScript === "seatunnel.sh") {
default: taskParams.deployMode = data.deployMode
break taskParams.others = data.others
} }
} }

44
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts

@ -46,7 +46,7 @@ export function useSeaTunnel({
memoryMax: -1, memoryMax: -1,
delayTime: 0, delayTime: 0,
timeout: 30, timeout: 30,
engine: 'FLINK', startupScript: 'seatunnel.sh',
runMode: 'RUN', runMode: 'RUN',
useCustom: true, useCustom: true,
deployMode: 'client', deployMode: 'client',
@ -56,25 +56,29 @@ export function useSeaTunnel({
timeoutNotifyStrategy: ['WARN'], timeoutNotifyStrategy: ['WARN'],
rawScript: rawScript:
'env {\n' + 'env {\n' +
' execution.parallelism = 1\n' + ' execution.parallelism = 2\n' +
'}\n' + ' job.mode = "BATCH"\n' +
'\n' + ' checkpoint.interval = 10000\n' +
'source {\n' + '}\n' +
' FakeSourceStream {\n' + '\n' +
' result_table_name = "fake"\n' + 'source {\n' +
' field_name = "name,age"\n' + ' FakeSource {\n' +
' }\n' + ' parallelism = 2\n' +
'}\n' + ' result_table_name = "fake"\n' +
'\n' + ' row.num = 16\n' +
'transform {\n' + ' schema = {\n' +
' sql {\n' + ' fields {\n' +
' sql = "select name,age from fake"\n' + ' name = "string"\n' +
' }\n' + ' age = "int"\n' +
'}\n' + ' }\n' +
'\n' + ' }\n' +
'sink {\n' + ' }\n' +
' ConsoleSink {}\n' + '}\n' +
'}' '\n' +
'sink {\n' +
' Console {\n' +
' }\n' +
'}'
} as INodeData) } as INodeData)
return { return {

1
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -342,6 +342,7 @@ interface ITaskParams {
parameters?: string parameters?: string
kernel?: string kernel?: string
engine?: string engine?: string
startupScript?: string
executionTimeout?: string executionTimeout?: string
startTimeout?: string startTimeout?: string
processDefinitionCode?: number processDefinitionCode?: number

Loading…
Cancel
Save