diff --git a/docs/docs/en/guide/task/spark.md b/docs/docs/en/guide/task/spark.md
index 3e0f83b253..930f2cd0b0 100644
--- a/docs/docs/en/guide/task/spark.md
+++ b/docs/docs/en/guide/task/spark.md
@@ -24,6 +24,7 @@ Spark task type for executing Spark application. When executing the Spark task,
|----------------------------|------------------------------------------------------------------------------------------------------------------------------------|
| Program type | Supports Java, Scala, Python, and SQL. |
| The class of main function | The **full path** of Main Class, the entry point of the Spark program. |
+| Master | The The master URL for the cluster. |
| Main jar package | The Spark jar package (upload by Resource Center). |
| SQL scripts | SQL statements in .sql files that Spark sql runs. |
| Deployment mode |
- spark submit supports three modes: cluster, client and local.
- spark sql supports client and local modes.
|
diff --git a/docs/docs/zh/guide/task/spark.md b/docs/docs/zh/guide/task/spark.md
index a392f55826..2f7b2ee346 100644
--- a/docs/docs/zh/guide/task/spark.md
+++ b/docs/docs/zh/guide/task/spark.md
@@ -23,6 +23,7 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
- 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。
- 主函数的 Class:Spark 程序的入口 Main class 的全路径。
- 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。
+- Master:执行 Spark 集群的 Master Url。
- SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。
- 部署方式:(1) spark submit 支持 cluster、client 和 local 三种模式。
(2) spark sql 支持 client 和 local 两种模式。
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
index c5fcb5b76b..873ba22c71 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
@@ -38,6 +38,11 @@ public class SparkParameters extends AbstractParameters {
*/
private String mainClass;
+ /**
+ * master url
+ */
+ private String master;
+
/**
* deploy mode local / cluster / client
*/
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index a0d1f3fc77..3c5fc17698 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -124,22 +124,31 @@ public class SparkTask extends AbstractYarnTask {
*/
private List populateSparkOptions() {
List args = new ArrayList<>();
- args.add(SparkConstants.MASTER);
+ // see https://spark.apache.org/docs/latest/submitting-applications.html
+ // TODO remove the option 'local' from deploy-mode
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
+ boolean onLocal = SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode);
boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace());
- String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX +
- Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl()
- : SparkConstants.SPARK_ON_YARN;
+ String masterUrl = StringUtils.isNotEmpty(sparkParameters.getMaster()) ? sparkParameters.getMaster()
+ : onLocal ? deployMode
+ : onNativeKubernetes
+ ? SPARK_ON_K8S_MASTER_PREFIX + Config
+ .fromKubeconfig(
+ taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
+ .getMasterUrl()
+ : SparkConstants.SPARK_ON_YARN;
+
+ args.add(SparkConstants.MASTER);
+ args.add(masterUrl);
- if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
- args.add(masterUrl);
+ if (!onLocal) {
args.add(SparkConstants.DEPLOY_MODE);
+ args.add(deployMode);
}
- args.add(deployMode);
ProgramType programType = sparkParameters.getProgramType();
String mainClass = sparkParameters.getMainClass();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
index ab164f2eb5..19ec707c62 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
@@ -54,6 +54,5 @@ public class SparkParametersTest {
resourceFilesList = sparkParameters.getResourceFilesList();
Assertions.assertNotNull(resourceFilesList);
Assertions.assertEquals(3, resourceFilesList.size());
-
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 78d8968e59..5138562564 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -17,16 +17,27 @@
package org.apache.dolphinscheduler.plugin.task.spark;
+import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.TYPE_FILE;
+import static org.mockito.ArgumentMatchers.any;
+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -41,25 +52,67 @@ public class SparkTaskTest {
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+ ResourceContext resourceContext = Mockito.mock(ResourceContext.class);
+ Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
+ ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
+ resourceItem.setResourceAbsolutePathInLocal("test");
+ Mockito.when(resourceContext.getResourceItem(any())).thenReturn(resourceItem);
+
+ try (MockedStatic fileUtilsMockedStatic = Mockito.mockStatic(FileUtils.class)) {
+ fileUtilsMockedStatic
+ .when(() -> FileUtils
+ .readFileToString(any(File.class), any(Charset.class)))
+ .thenReturn("test");
+
+ SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
+ sparkTask.init();
+ Assertions.assertEquals(
+ "${SPARK_HOME}/bin/spark-sql " +
+ "--master yarn " +
+ "--deploy-mode client " +
+ "--conf spark.driver.cores=1 " +
+ "--conf spark.driver.memory=512M " +
+ "--conf spark.executor.instances=2 " +
+ "--conf spark.executor.cores=2 " +
+ "--conf spark.executor.memory=1G " +
+ "--name sparksql " +
+ "-f /tmp/5536_node.sql",
+ sparkTask.getScript());
+ }
+ }
+
+ @Test
+ public void testBuildCommandWithSparkSubmit() {
+ String parameters = buildSparkParametersWithSparkSubmit();
+ TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+ ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
+ resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
+ resourceItem.setResourceAbsolutePathInLocal("/lib/dolphinscheduler-task-spark.jar");
+ ResourceContext resourceContext = new ResourceContext();
+ resourceContext.addResourceItem(resourceItem);
+
+ Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assertions.assertEquals(
- "${SPARK_HOME}/bin/spark-sql " +
+ "${SPARK_HOME}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
+ "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--conf spark.driver.cores=1 " +
"--conf spark.driver.memory=512M " +
"--conf spark.executor.instances=2 " +
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
- "--name sparksql " +
- "-f /tmp/5536_node.sql",
+ "--name spark " +
+ "/lib/dolphinscheduler-task-spark.jar",
sparkTask.getScript());
}
@Test
- public void testBuildCommandWithSparkSubmit() {
- String parameters = buildSparkParametersWithSparkSubmit();
+ public void testBuildCommandWithSparkSubmitMaster() {
+ String parameters = buildSparkParametersWithMaster();
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
@@ -73,7 +126,7 @@ public class SparkTaskTest {
sparkTask.init();
Assertions.assertEquals(
"${SPARK_HOME}/bin/spark-submit " +
- "--master yarn " +
+ "--master spark://localhost:7077 " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--conf spark.driver.cores=1 " +
@@ -91,6 +144,7 @@ public class SparkTaskTest {
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;");
sparkParameters.setProgramType(ProgramType.SQL);
+ sparkParameters.setSqlExecutionType(TYPE_FILE);
sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql");
@@ -100,6 +154,13 @@ public class SparkTaskTest {
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
+
+ ResourceInfo resourceInfo1 = new ResourceInfo();
+ resourceInfo1.setResourceName("testSparkParameters1.jar");
+ List resourceInfos = new ArrayList<>(Arrays.asList(
+ resourceInfo1));
+ sparkParameters.setResourceList(resourceInfos);
+
return JSONUtils.toJsonString(sparkParameters);
}
@@ -122,4 +183,24 @@ public class SparkTaskTest {
return JSONUtils.toJsonString(sparkParameters);
}
+ private String buildSparkParametersWithMaster() {
+ SparkParameters sparkParameters = new SparkParameters();
+ sparkParameters.setLocalParams(Collections.emptyList());
+ sparkParameters.setProgramType(ProgramType.SCALA);
+ sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
+ sparkParameters.setDeployMode("client");
+ sparkParameters.setAppName("spark");
+ sparkParameters.setMaster("spark://localhost:7077");
+ sparkParameters.setOthers("");
+ sparkParameters.setDriverCores(1);
+ sparkParameters.setDriverMemory("512M");
+ sparkParameters.setNumExecutors(2);
+ sparkParameters.setExecutorMemory("1G");
+ sparkParameters.setExecutorCores(2);
+ ResourceInfo resourceInfo = new ResourceInfo();
+ resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
+ sparkParameters.setMainJar(resourceInfo);
+ return JSONUtils.toJsonString(sparkParameters);
+ }
+
}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index cb50b19fc7..7a39752526 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -447,6 +447,8 @@ export default {
timeout_period_tips: 'Timeout must be a positive integer',
script: 'Script',
script_tips: 'Please enter script(required)',
+ master: 'Master',
+ master_tips: 'Please enter master url(required)',
init_script: 'Initialization script',
init_script_tips: 'Please enter initialization script',
resources: 'Resources',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 6865a49abc..50e0a821ef 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -437,6 +437,8 @@ export default {
timeout_period_tips: '超时时长必须为正整数',
script: '脚本',
script_tips: '请输入脚本(必填)',
+ master: 'Master',
+ master_tips: '请输入master url(必填)',
init_script: '初始化脚本',
init_script_tips: '请输入初始化脚本',
resources: '资源',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
index ad7fb77fa9..ab89e69e6d 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
@@ -37,6 +37,10 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
)
+ const masterSpan = computed(() =>
+ model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
+ )
+
const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
const rawScriptSpan = computed(() =>
@@ -138,6 +142,28 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
message: t('project.node.script_tips')
}
},
+ {
+ type: 'input',
+ field: 'master',
+ span: masterSpan,
+ name: t('project.node.master'),
+ props: {
+ placeholder: t('project.node.master_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ required: false,
+ validator(validate: any, value: string) {
+ if (
+ model.programType !== 'PYTHON' &&
+ !value &&
+ model.programType !== 'SQL'
+ ) {
+ return new Error(t('project.node.master_tips'))
+ }
+ }
+ }
+ },
useDeployMode(24, ref(true), showCluster),
useNamespace(),
{
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index ef9e5dec61..2ab1712b6f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -68,6 +68,7 @@ export function formatParams(data: INodeData): {
}
if (data.taskType === 'SPARK') {
+ taskParams.master = data.master
taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
index 05aa0fe1c6..1e5c929b0f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
@@ -45,7 +45,8 @@ export function useSpark({
timeout: 30,
programType: 'SCALA',
rawScript: '',
- deployMode: 'local',
+ master: '',
+ deployMode: '',
driverCores: 1,
driverMemory: '512M',
numExecutors: 2,