Browse Source

[Improvement][Spark] Support Local Spark Cluster (#15589)

* [Improvement][Spark] Support Local Spark Cluster

* remote default local from deploy mode

---------

Co-authored-by: Rick Cheng <rickchengx@gmail.com>
3.2.2-release-bak
John Huang 7 months ago committed by GitHub
parent
commit
6c78c8ec9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      docs/docs/en/guide/task/spark.md
  2. 1
      docs/docs/zh/guide/task/spark.md
  3. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
  4. 23
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  5. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
  6. 93
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  7. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  8. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  9. 26
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
  10. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  11. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts

1
docs/docs/en/guide/task/spark.md

@ -24,6 +24,7 @@ Spark task type for executing Spark application. When executing the Spark task,
|----------------------------|------------------------------------------------------------------------------------------------------------------------------------| |----------------------------|------------------------------------------------------------------------------------------------------------------------------------|
| Program type | Supports Java, Scala, Python, and SQL. | | Program type | Supports Java, Scala, Python, and SQL. |
| The class of main function | The **full path** of Main Class, the entry point of the Spark program. | | The class of main function | The **full path** of Main Class, the entry point of the Spark program. |
| Master | The The master URL for the cluster. |
| Main jar package | The Spark jar package (upload by Resource Center). | | Main jar package | The Spark jar package (upload by Resource Center). |
| SQL scripts | SQL statements in .sql files that Spark sql runs. | | SQL scripts | SQL statements in .sql files that Spark sql runs. |
| Deployment mode | <ul><li>spark submit supports three modes: cluster, client and local.</li><li>spark sql supports client and local modes.</li></ul> | | Deployment mode | <ul><li>spark submit supports three modes: cluster, client and local.</li><li>spark sql supports client and local modes.</li></ul> |

1
docs/docs/zh/guide/task/spark.md

@ -23,6 +23,7 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
- 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。 - 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。
- 主函数的 Class:Spark 程序的入口 Main class 的全路径。 - 主函数的 Class:Spark 程序的入口 Main class 的全路径。
- 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。 - 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。
- Master:执行 Spark 集群的 Master Url。
- SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。 - SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。
- 部署方式:(1) spark submit 支持 cluster、client 和 local 三种模式。 - 部署方式:(1) spark submit 支持 cluster、client 和 local 三种模式。
(2) spark sql 支持 client 和 local 两种模式。 (2) spark sql 支持 client 和 local 两种模式。

5
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java

@ -38,6 +38,11 @@ public class SparkParameters extends AbstractParameters {
*/ */
private String mainClass; private String mainClass;
/**
* master url
*/
private String master;
/** /**
* deploy mode local / cluster / client * deploy mode local / cluster / client
*/ */

23
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -124,22 +124,31 @@ public class SparkTask extends AbstractYarnTask {
*/ */
private List<String> populateSparkOptions() { private List<String> populateSparkOptions() {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
args.add(SparkConstants.MASTER);
// see https://spark.apache.org/docs/latest/submitting-applications.html
// TODO remove the option 'local' from deploy-mode
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL; : SparkConstants.DEPLOY_MODE_LOCAL;
boolean onLocal = SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode);
boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace()); boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace());
String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX + String masterUrl = StringUtils.isNotEmpty(sparkParameters.getMaster()) ? sparkParameters.getMaster()
Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl() : onLocal ? deployMode
: SparkConstants.SPARK_ON_YARN; : onNativeKubernetes
? SPARK_ON_K8S_MASTER_PREFIX + Config
.fromKubeconfig(
taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
.getMasterUrl()
: SparkConstants.SPARK_ON_YARN;
args.add(SparkConstants.MASTER);
args.add(masterUrl);
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) { if (!onLocal) {
args.add(masterUrl);
args.add(SparkConstants.DEPLOY_MODE); args.add(SparkConstants.DEPLOY_MODE);
args.add(deployMode);
} }
args.add(deployMode);
ProgramType programType = sparkParameters.getProgramType(); ProgramType programType = sparkParameters.getProgramType();
String mainClass = sparkParameters.getMainClass(); String mainClass = sparkParameters.getMainClass();

1
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java

@ -54,6 +54,5 @@ public class SparkParametersTest {
resourceFilesList = sparkParameters.getResourceFilesList(); resourceFilesList = sparkParameters.getResourceFilesList();
Assertions.assertNotNull(resourceFilesList); Assertions.assertNotNull(resourceFilesList);
Assertions.assertEquals(3, resourceFilesList.size()); Assertions.assertEquals(3, resourceFilesList.size());
} }
} }

93
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -17,16 +17,27 @@
package org.apache.dolphinscheduler.plugin.task.spark; package org.apache.dolphinscheduler.plugin.task.spark;
import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.TYPE_FILE;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -41,25 +52,67 @@ public class SparkTaskTest {
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
ResourceContext resourceContext = Mockito.mock(ResourceContext.class);
Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
resourceItem.setResourceAbsolutePathInLocal("test");
Mockito.when(resourceContext.getResourceItem(any())).thenReturn(resourceItem);
try (MockedStatic<FileUtils> fileUtilsMockedStatic = Mockito.mockStatic(FileUtils.class)) {
fileUtilsMockedStatic
.when(() -> FileUtils
.readFileToString(any(File.class), any(Charset.class)))
.thenReturn("test");
SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assertions.assertEquals(
"${SPARK_HOME}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--conf spark.driver.cores=1 " +
"--conf spark.driver.memory=512M " +
"--conf spark.executor.instances=2 " +
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql",
sparkTask.getScript());
}
}
@Test
public void testBuildCommandWithSparkSubmit() {
String parameters = buildSparkParametersWithSparkSubmit();
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
resourceItem.setResourceAbsolutePathInLocal("/lib/dolphinscheduler-task-spark.jar");
ResourceContext resourceContext = new ResourceContext();
resourceContext.addResourceItem(resourceItem);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext)); SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
sparkTask.init(); sparkTask.init();
Assertions.assertEquals( Assertions.assertEquals(
"${SPARK_HOME}/bin/spark-sql " + "${SPARK_HOME}/bin/spark-submit " +
"--master yarn " + "--master yarn " +
"--deploy-mode client " + "--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--conf spark.driver.cores=1 " + "--conf spark.driver.cores=1 " +
"--conf spark.driver.memory=512M " + "--conf spark.driver.memory=512M " +
"--conf spark.executor.instances=2 " + "--conf spark.executor.instances=2 " +
"--conf spark.executor.cores=2 " + "--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " + "--conf spark.executor.memory=1G " +
"--name sparksql " + "--name spark " +
"-f /tmp/5536_node.sql", "/lib/dolphinscheduler-task-spark.jar",
sparkTask.getScript()); sparkTask.getScript());
} }
@Test @Test
public void testBuildCommandWithSparkSubmit() { public void testBuildCommandWithSparkSubmitMaster() {
String parameters = buildSparkParametersWithSparkSubmit(); String parameters = buildSparkParametersWithMaster();
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem(); ResourceContext.ResourceItem resourceItem = new ResourceContext.ResourceItem();
resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar"); resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
@ -73,7 +126,7 @@ public class SparkTaskTest {
sparkTask.init(); sparkTask.init();
Assertions.assertEquals( Assertions.assertEquals(
"${SPARK_HOME}/bin/spark-submit " + "${SPARK_HOME}/bin/spark-submit " +
"--master yarn " + "--master spark://localhost:7077 " +
"--deploy-mode client " + "--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " + "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--conf spark.driver.cores=1 " + "--conf spark.driver.cores=1 " +
@ -91,6 +144,7 @@ public class SparkTaskTest {
sparkParameters.setLocalParams(Collections.emptyList()); sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;"); sparkParameters.setRawScript("selcet 11111;");
sparkParameters.setProgramType(ProgramType.SQL); sparkParameters.setProgramType(ProgramType.SQL);
sparkParameters.setSqlExecutionType(TYPE_FILE);
sparkParameters.setMainClass(""); sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client"); sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql"); sparkParameters.setAppName("sparksql");
@ -100,6 +154,13 @@ public class SparkTaskTest {
sparkParameters.setNumExecutors(2); sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G"); sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2); sparkParameters.setExecutorCores(2);
ResourceInfo resourceInfo1 = new ResourceInfo();
resourceInfo1.setResourceName("testSparkParameters1.jar");
List<ResourceInfo> resourceInfos = new ArrayList<>(Arrays.asList(
resourceInfo1));
sparkParameters.setResourceList(resourceInfos);
return JSONUtils.toJsonString(sparkParameters); return JSONUtils.toJsonString(sparkParameters);
} }
@ -122,4 +183,24 @@ public class SparkTaskTest {
return JSONUtils.toJsonString(sparkParameters); return JSONUtils.toJsonString(sparkParameters);
} }
private String buildSparkParametersWithMaster() {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setProgramType(ProgramType.SCALA);
sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("spark");
sparkParameters.setMaster("spark://localhost:7077");
sparkParameters.setOthers("");
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
sparkParameters.setMainJar(resourceInfo);
return JSONUtils.toJsonString(sparkParameters);
}
} }

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -447,6 +447,8 @@ export default {
timeout_period_tips: 'Timeout must be a positive integer', timeout_period_tips: 'Timeout must be a positive integer',
script: 'Script', script: 'Script',
script_tips: 'Please enter script(required)', script_tips: 'Please enter script(required)',
master: 'Master',
master_tips: 'Please enter master url(required)',
init_script: 'Initialization script', init_script: 'Initialization script',
init_script_tips: 'Please enter initialization script', init_script_tips: 'Please enter initialization script',
resources: 'Resources', resources: 'Resources',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -437,6 +437,8 @@ export default {
timeout_period_tips: '超时时长必须为正整数', timeout_period_tips: '超时时长必须为正整数',
script: '脚本', script: '脚本',
script_tips: '请输入脚本(必填)', script_tips: '请输入脚本(必填)',
master: 'Master',
master_tips: '请输入master url(必填)',
init_script: '初始化脚本', init_script: '初始化脚本',
init_script_tips: '请输入初始化脚本', init_script_tips: '请输入初始化脚本',
resources: '资源', resources: '资源',

26
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts

@ -37,6 +37,10 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24 model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
) )
const masterSpan = computed(() =>
model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
)
const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24)) const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
const rawScriptSpan = computed(() => const rawScriptSpan = computed(() =>
@ -138,6 +142,28 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
message: t('project.node.script_tips') message: t('project.node.script_tips')
} }
}, },
{
type: 'input',
field: 'master',
span: masterSpan,
name: t('project.node.master'),
props: {
placeholder: t('project.node.master_tips')
},
validate: {
trigger: ['input', 'blur'],
required: false,
validator(validate: any, value: string) {
if (
model.programType !== 'PYTHON' &&
!value &&
model.programType !== 'SQL'
) {
return new Error(t('project.node.master_tips'))
}
}
}
},
useDeployMode(24, ref(true), showCluster), useDeployMode(24, ref(true), showCluster),
useNamespace(), useNamespace(),
{ {

1
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -68,6 +68,7 @@ export function formatParams(data: INodeData): {
} }
if (data.taskType === 'SPARK') { if (data.taskType === 'SPARK') {
taskParams.master = data.master
taskParams.driverCores = data.driverCores taskParams.driverCores = data.driverCores
taskParams.driverMemory = data.driverMemory taskParams.driverMemory = data.driverMemory
taskParams.numExecutors = data.numExecutors taskParams.numExecutors = data.numExecutors

3
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts

@ -45,7 +45,8 @@ export function useSpark({
timeout: 30, timeout: 30,
programType: 'SCALA', programType: 'SCALA',
rawScript: '', rawScript: '',
deployMode: 'local', master: '',
deployMode: '',
driverCores: 1, driverCores: 1,
driverMemory: '512M', driverMemory: '512M',
numExecutors: 2, numExecutors: 2,

Loading…
Cancel
Save