From 4d8d74ac1816e9ba11038d57cffb491074ce0a78 Mon Sep 17 00:00:00 2001 From: Yelli <51317527+Yeleights@users.noreply.github.com> Date: Tue, 17 Dec 2019 11:33:11 +0800 Subject: [PATCH] #839 enhancement : add Spark Task Component can switch Spark Version (#1494) * add Spark Version in Spark Component add Spark Version in Spark Component * add license for SparkVersion.class add license * 1 add spark task UT 2 add spark version param check * add assert check for sparkTaskTest --- .../common/enums/SparkVersion.java | 40 +++++ .../common/task/spark/SparkParameters.java | 17 ++- .../server/worker/task/spark/SparkTask.java | 21 ++- .../worker/task/spark/SparkTaskTest.java | 141 ++++++++++++++++++ .../dag/_source/formModel/tasks/spark.vue | 26 +++- .../src/js/module/i18n/locale/en_US.js | 4 +- .../src/js/module/i18n/locale/zh_CN.js | 3 +- 7 files changed, 241 insertions(+), 11 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java new file mode 100644 index 0000000000..e3f7c73797 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.Getter; + +@Getter +public enum SparkVersion { + + /** + * 0 SPARK1 + * 1 SPARK2 + */ + SPARK1(0, "SPARK1"), + SPARK2(1, "SPARK2"); + + SparkVersion(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 41263f0a74..dbafddfddd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -95,6 +95,11 @@ public class SparkParameters extends AbstractParameters { */ private ProgramType programType; + /** + * spark version + */ + private String sparkVersion; + public ResourceInfo getMainJar() { return mainJar; } @@ -200,9 +205,17 @@ public class SparkParameters extends AbstractParameters { this.programType = programType; } + public String getSparkVersion() { + return sparkVersion; + } + + public void setSparkVersion(String sparkVersion) { + this.sparkVersion = sparkVersion; + } + @Override public boolean checkParameters() { - return mainJar != null && programType != null; + return mainJar != null && programType != null && sparkVersion != null; } @@ -211,7 +224,7 @@ public class SparkParameters extends AbstractParameters { if(resourceList !=null ) { this.resourceList.add(mainJar); return resourceList.stream() - .map(p -> p.getRes()).collect(Collectors.toList()); + .map(ResourceInfo::getRes).collect(Collectors.toList()); } return null; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 1fd54785d1..34f7d13ca8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.server.worker.task.spark; +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; @@ -25,7 +27,6 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import java.util.ArrayList; @@ -38,9 +39,14 @@ import java.util.Map; public class SparkTask extends AbstractYarnTask { /** - * spark command + * spark1 command + */ + private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + + /** + * spark2 command */ - private static final String SPARK_COMMAND = "spark-submit"; + private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; /** * spark parameters @@ -89,7 +95,14 @@ public class SparkTask extends AbstractYarnTask { protected String buildCommand() { List args = new ArrayList<>(); - args.add(SPARK_COMMAND); + //spark version + String sparkCommand = SPARK2_COMMAND; + + if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SPARK1_COMMAND; + } + + args.add(sparkCommand); // other parameters args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java new file mode 100644 index 0000000000..b502e13bc6 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.spark; + +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.enums.SparkVersion; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.spark.SparkParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class SparkTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class); + + /** + * spark1 command + */ + private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + + /** + * spark2 command + */ + private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; + + @Test + public void testSparkTaskInit() { + + TaskProps taskProps = new TaskProps(); + + String spark1Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK1\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + String spark2Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK2\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + taskProps.setTaskParams(spark2Params); + + logger.info("spark task params {}", taskProps.getTaskParams()); + + SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class); + + assert sparkParameters != null; + if (!sparkParameters.checkParameters()) { + throw new RuntimeException("spark task params is not valid"); + } + sparkParameters.setQueue(taskProps.getQueue()); + + if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { + String args = sparkParameters.getMainArgs(); + + /** + * combining local and global parameters + */ + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + sparkParameters.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); + if (paramsMap != null) { + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + } + sparkParameters.setMainArgs(args); + } + + List args = new ArrayList<>(); + + //spark version + String sparkCommand = SPARK2_COMMAND; + + if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SPARK1_COMMAND; + } + + args.add(sparkCommand); + + // other parameters + args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); + + String sparkArgs = String.join(" ", args); + + logger.info("spark task command : {}", sparkArgs); + + Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND ); + + } +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 939dccb0f5..2457a23faa 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -32,6 +32,22 @@ + +
{{$t('Spark Version')}}
+
+ + + + +
+
{{$t('Main class')}}
@@ -224,7 +240,11 @@ // Program type programType: 'SCALA', // Program type(List) - programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }] + programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }], + // Spark version + sparkVersion: 'SPARK2', + // Spark version(LIst) + sparkVersionList: [{ code: 'SPARK2' }, { code: 'SPARK1' }] } }, props: { @@ -318,7 +338,8 @@ executorCores: this.executorCores, mainArgs: this.mainArgs, others: this.others, - programType: this.programType + programType: this.programType, + sparkVersion: this.sparkVersion }) return true }, @@ -366,6 +387,7 @@ this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'SCALA' + this.sparkVersion = o.params.sparkVersion || 'SPARK2' // backfill resourceList let resourceList = o.params.resourceList || [] diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index fb87146dff..dfc7c72c49 100644 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -82,7 +82,6 @@ export default { 'Please enter a positive integer': 'Please enter a positive integer', 'Program Type': 'Program Type', 'Main class': 'Main class', - 'Please enter main class': 'Please enter main class', 'Main jar package': 'Main jar package', 'Please enter main jar package': 'Please enter main jar package', 'Command-line parameters': 'Command-line parameters', @@ -506,5 +505,6 @@ export default { 'There is no data for this period of time': 'There is no data for this period of time', 'IP address cannot be empty': 'IP address cannot be empty', 'Please enter the correct IP': 'Please enter the correct IP', - 'Please generate token': 'Please generate token' + 'Please generate token': 'Please generate token', + 'Spark Version': 'Spark Version' } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 11fd0940bf..7df7cdd230 100644 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -505,5 +505,6 @@ export default { 'There is no data for this period of time': '该时间段无数据', 'IP address cannot be empty': 'IP地址不能为空', 'Please enter the correct IP': '请输入正确的IP', - 'Please generate token': '请生成Token' + 'Please generate token': '请生成Token', + 'Spark Version': 'Spark版本' }