From e74932cfc21c95d7824cd55dd3cb6e441e2b8305 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Tue, 9 Mar 2021 12:51:35 +0800 Subject: [PATCH] [Feature-4960][Spark] Add name input for Spark and improve Spark & MR args (#4959) * [Improvement][Spark] Improve spark args and add name input * [Improvement][MR] Improve map reduce args * [Improvement][Spark] Fix check style * [Improvement][Spark] Fix check style for spark, flink and mr --- .../dolphinscheduler/common/Constants.java | 4 + .../common/task/spark/SparkParameters.java | 369 +++++++++--------- .../server/utils/FlinkArgsUtils.java | 1 - .../server/utils/SparkArgsUtils.java | 43 +- .../server/worker/task/mr/MapReduceTask.java | 40 +- .../server/worker/task/spark/SparkTask.java | 3 + .../server/utils/FlinkArgsUtilsTest.java | 24 +- .../server/utils/SparkArgsUtilsTest.java | 65 +-- .../dag/_source/formModel/tasks/spark.vue | 17 + 9 files changed, 305 insertions(+), 261 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 4c43c0cd08..6e5cf66495 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -604,6 +604,10 @@ public final class Constants { */ public static final String EXECUTOR_MEMORY = "--executor-memory"; + /** + * --name NAME + */ + public static final String SPARK_NAME = "--name"; /** * --queue QUEUE diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 32a2a6b05d..947f09e9b6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.spark; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; @@ -29,203 +29,214 @@ import java.util.List; */ public class SparkParameters extends AbstractParameters { - /** - * major jar - */ - private ResourceInfo mainJar; - - /** - * major class - */ - private String mainClass; - - /** - * deploy mode - */ - private String deployMode; - - /** - * arguments - */ - private String mainArgs; - - /** - * driver-cores Number of cores used by the driver, only in cluster mode - */ - private int driverCores; - - /** - * driver-memory Memory for driver - */ - - private String driverMemory; - - /** - * num-executors Number of executors to launch - */ - private int numExecutors; - - /** - * executor-cores Number of cores per executor - */ - private int executorCores; - - /** - * Memory per executor - */ - private String executorMemory; - - /** - * resource list - */ - private List resourceList = new ArrayList<>(); - - /** - * The YARN queue to submit to - */ - private String queue; - - /** - * other arguments - */ - private String others; - - /** - * program type - * 0 JAVA,1 SCALA,2 PYTHON - */ - private ProgramType programType; - - /** - * spark version - */ - private String sparkVersion; - - public ResourceInfo getMainJar() { - return mainJar; - } - - public void setMainJar(ResourceInfo mainJar) { - this.mainJar = mainJar; - } - - public String getMainClass() { - return mainClass; - } - - public void setMainClass(String mainClass) { - this.mainClass = mainClass; - } - - public String getDeployMode() { - return deployMode; - } - - public void setDeployMode(String deployMode) { - this.deployMode = deployMode; - } - - public String getMainArgs() { - return mainArgs; - } - - public void setMainArgs(String mainArgs) { - this.mainArgs = mainArgs; - } - - public int getDriverCores() { - return driverCores; - } - - public void setDriverCores(int driverCores) { - this.driverCores = driverCores; - } - - public String getDriverMemory() { - return driverMemory; - } - - public void setDriverMemory(String driverMemory) { - this.driverMemory = driverMemory; - } + /** + * main jar + */ + private ResourceInfo mainJar; + + /** + * main class + */ + private String mainClass; + + /** + * deploy mode + */ + private String deployMode; + + /** + * arguments + */ + private String mainArgs; + + /** + * driver-cores Number of cores used by the driver, only in cluster mode + */ + private int driverCores; + + /** + * driver-memory Memory for driver + */ + + private String driverMemory; + + /** + * num-executors Number of executors to launch + */ + private int numExecutors; + + /** + * executor-cores Number of cores per executor + */ + private int executorCores; + + /** + * Memory per executor + */ + private String executorMemory; + + /** + * app name + */ + private String appName; + + /** + * The YARN queue to submit to + */ + private String queue; + + /** + * other arguments + */ + private String others; + + /** + * program type + * 0 JAVA,1 SCALA,2 PYTHON + */ + private ProgramType programType; + + /** + * spark version + */ + private String sparkVersion; + + /** + * resource list + */ + private List resourceList = new ArrayList<>(); + + public ResourceInfo getMainJar() { + return mainJar; + } + + public void setMainJar(ResourceInfo mainJar) { + this.mainJar = mainJar; + } + + public String getMainClass() { + return mainClass; + } + + public void setMainClass(String mainClass) { + this.mainClass = mainClass; + } + + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + + public String getMainArgs() { + return mainArgs; + } + + public void setMainArgs(String mainArgs) { + this.mainArgs = mainArgs; + } + + public int getDriverCores() { + return driverCores; + } + + public void setDriverCores(int driverCores) { + this.driverCores = driverCores; + } + + public String getDriverMemory() { + return driverMemory; + } + + public void setDriverMemory(String driverMemory) { + this.driverMemory = driverMemory; + } - public int getNumExecutors() { - return numExecutors; - } + public int getNumExecutors() { + return numExecutors; + } - public void setNumExecutors(int numExecutors) { - this.numExecutors = numExecutors; - } + public void setNumExecutors(int numExecutors) { + this.numExecutors = numExecutors; + } - public int getExecutorCores() { - return executorCores; - } - - public void setExecutorCores(int executorCores) { - this.executorCores = executorCores; - } + public int getExecutorCores() { + return executorCores; + } - public String getExecutorMemory() { - return executorMemory; - } + public void setExecutorCores(int executorCores) { + this.executorCores = executorCores; + } - public void setExecutorMemory(String executorMemory) { - this.executorMemory = executorMemory; - } + public String getExecutorMemory() { + return executorMemory; + } + public void setExecutorMemory(String executorMemory) { + this.executorMemory = executorMemory; + } - public String getQueue() { - return queue; - } + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } - public void setQueue(String queue) { - this.queue = queue; - } + public String getQueue() { + return queue; + } - public List getResourceList() { - return resourceList; - } + public void setQueue(String queue) { + this.queue = queue; + } - public void setResourceList(List resourceList) { - this.resourceList = resourceList; - } + public String getOthers() { + return others; + } - public String getOthers() { - return others; - } + public void setOthers(String others) { + this.others = others; + } - public void setOthers(String others) { - this.others = others; - } + public List getResourceList() { + return resourceList; + } - public ProgramType getProgramType() { - return programType; - } + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } - public void setProgramType(ProgramType programType) { - this.programType = programType; - } + public ProgramType getProgramType() { + return programType; + } - public String getSparkVersion() { - return sparkVersion; - } + public void setProgramType(ProgramType programType) { + this.programType = programType; + } - public void setSparkVersion(String sparkVersion) { - this.sparkVersion = sparkVersion; - } + public String getSparkVersion() { + return sparkVersion; + } - @Override - public boolean checkParameters() { - return mainJar != null && programType != null; - } + public void setSparkVersion(String sparkVersion) { + this.sparkVersion = sparkVersion; + } - @Override - public List getResourceFilesList() { - if (mainJar != null && !resourceList.contains(mainJar)) { - resourceList.add(mainJar); + @Override + public boolean checkParameters() { + return mainJar != null && programType != null; } - return resourceList; - } + @Override + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); + } + return resourceList; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 2d5198c227..8b3b691abc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -129,5 +129,4 @@ public class FlinkArgsUtils { return args; } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java index 0c68016db0..76828f346d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java @@ -45,19 +45,14 @@ public class SparkArgsUtils { */ public static List buildArgs(SparkParameters param) { List args = new ArrayList<>(); - String deployMode = SPARK_CLUSTER; - args.add(Constants.MASTER); - if (StringUtils.isNotEmpty(param.getDeployMode())) { - deployMode = param.getDeployMode(); - } + String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER; if (!SPARK_LOCAL.equals(deployMode)) { args.add(SPARK_ON_YARN); args.add(Constants.DEPLOY_MODE); } - - args.add(param.getDeployMode()); + args.add(deployMode); ProgramType type = param.getProgramType(); String mainClass = param.getMainClass(); @@ -67,7 +62,7 @@ public class SparkArgsUtils { } int driverCores = param.getDriverCores(); - if (driverCores != 0) { + if (driverCores > 0) { args.add(Constants.DRIVER_CORES); args.add(String.format("%d", driverCores)); } @@ -79,13 +74,13 @@ public class SparkArgsUtils { } int numExecutors = param.getNumExecutors(); - if (numExecutors != 0) { + if (numExecutors > 0) { args.add(Constants.NUM_EXECUTORS); args.add(String.format("%d", numExecutors)); } int executorCores = param.getExecutorCores(); - if (executorCores != 0) { + if (executorCores > 0) { args.add(Constants.EXECUTOR_CORES); args.add(String.format("%d", executorCores)); } @@ -96,22 +91,26 @@ public class SparkArgsUtils { args.add(executorMemory); } - // --files --conf --libjar ... - String others = param.getOthers(); - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(others)) { + String appName = param.getAppName(); + if (StringUtils.isNotEmpty(appName)) { + args.add(Constants.SPARK_NAME); + args.add(ArgsUtils.escape(appName)); + } - if (!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)) { - args.add(Constants.SPARK_QUEUE); - args.add(queue); + String others = param.getOthers(); + if (!SPARK_LOCAL.equals(deployMode)) { + if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) { + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + args.add(Constants.SPARK_QUEUE); + args.add(queue); + } } + } + // --conf --files --jars --packages + if (StringUtils.isNotEmpty(others)) { args.add(others); - - } else if (StringUtils.isNotEmpty(queue)) { - args.add(Constants.SPARK_QUEUE); - args.add(queue); - } ResourceInfo mainJar = param.getMainJar(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 3a8f75a64f..43a7079dcf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -23,24 +23,30 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.slf4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.slf4j.Logger; + /** * mapreduce task */ public class MapReduceTask extends AbstractYarnTask { + /** + * map reduce command + * usage: hadoop jar [mainClass] [GENERIC_OPTIONS] args... + */ + private static final String MAP_REDUCE_COMMAND = Constants.HADOOP; /** * mapreduce parameters @@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask { mapreduceParameters.setQueue(taskExecutionContext.getQueue()); setMainJarName(); - // replace placeholder Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), @@ -85,10 +90,10 @@ public class MapReduceTask extends AbstractYarnTask { CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - if (paramsMap != null){ + if (paramsMap != null) { String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); mapreduceParameters.setMainArgs(args); - if(mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON){ + if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) { String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap)); mapreduceParameters.setOthers(others); } @@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask { */ @Override protected String buildCommand() throws Exception { - List parameterList = buildParameters(mapreduceParameters); + // hadoop jar [mainClass] [GENERIC_OPTIONS] args... + List args = new ArrayList<>(); + args.add(MAP_REDUCE_COMMAND); + + args.addAll(buildParameters(mapreduceParameters)); - String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), + String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); logger.info("mapreduce task command: {}", command); @@ -143,21 +152,18 @@ public class MapReduceTask extends AbstractYarnTask { * @param mapreduceParameters mapreduce parameters * @return parameter list */ - private List buildParameters(MapreduceParameters mapreduceParameters){ - + private List buildParameters(MapreduceParameters mapreduceParameters) { List result = new ArrayList<>(); - result.add(Constants.HADOOP); - // main jar - if(mapreduceParameters.getMainJar()!= null){ + if (mapreduceParameters.getMainJar() != null) { result.add(Constants.JAR); result.add(mapreduceParameters.getMainJar().getRes()); } // main class - if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType()) - && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){ + if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType()) + && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) { result.add(mapreduceParameters.getMainClass()); } @@ -170,13 +176,13 @@ public class MapReduceTask extends AbstractYarnTask { } result.add(mapreduceParameters.getOthers()); - }else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { + } else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); } // command args - if(StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())){ + if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) { result.add(mapreduceParameters.getMainArgs()); } return result; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 4e1a4d5356..3a27399d2e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -44,11 +44,13 @@ public class SparkTask extends AbstractYarnTask { /** * spark1 command + * usage: spark-submit [options] [app arguments] */ private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; /** * spark2 command + * usage: spark-submit [options] [app arguments] */ private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; @@ -93,6 +95,7 @@ public class SparkTask extends AbstractYarnTask { */ @Override protected String buildCommand() { + // spark-submit [options] [app arguments] List args = new ArrayList<>(); //spark version diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index f03062835e..dd12029765 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest { assertEquals("yarn-cluster", result.get(1)); assertEquals("-ys", result.get(2)); - assertSame(Integer.valueOf(result.get(3)),slot); + assertSame(slot, Integer.valueOf(result.get(3))); - assertEquals("-ynm",result.get(4)); - assertEquals(result.get(5),appName); + assertEquals("-ynm", result.get(4)); + assertEquals(appName, result.get(5)); assertEquals("-yn", result.get(6)); - assertSame(Integer.valueOf(result.get(7)),taskManager); + assertSame(taskManager, Integer.valueOf(result.get(7))); assertEquals("-yjm", result.get(8)); - assertEquals(result.get(9),jobManagerMemory); + assertEquals(jobManagerMemory, result.get(9)); assertEquals("-ytm", result.get(10)); - assertEquals(result.get(11),taskManagerMemory); + assertEquals(taskManagerMemory, result.get(11)); assertEquals("-yqu", result.get(12)); - assertEquals(result.get(13),queue); + assertEquals(queue, result.get(13)); assertEquals("-p", result.get(14)); - assertSame(Integer.valueOf(result.get(15)),parallelism); + assertSame(parallelism, Integer.valueOf(result.get(15))); assertEquals("-sae", result.get(16)); - assertEquals(result.get(17),others); + assertEquals(others, result.get(17)); assertEquals("-c", result.get(18)); - assertEquals(result.get(19),mainClass); + assertEquals(mainClass, result.get(19)); - assertEquals(result.get(20),mainJar.getRes()); - assertEquals(result.get(21),mainArgs); + assertEquals(mainJar.getRes(), result.get(20)); + assertEquals(mainArgs, result.get(21)); //Others param without -yqu FlinkParameters param1 = new FlinkParameters(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java index 6e55fa731b..f76c2eaa31 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java @@ -17,19 +17,20 @@ package org.apache.dolphinscheduler.server.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; + +import java.util.List; + import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - /** * Test SparkArgsUtils */ @@ -48,12 +49,11 @@ public class SparkArgsUtilsTest { public int executorCores = 6; public String sparkVersion = "SPARK1"; public int numExecutors = 4; + public String appName = "spark test"; public String queue = "queue1"; - @Before - public void setUp() throws Exception { - + public void setUp() { ResourceInfo main = new ResourceInfo(); main.setRes("testspark-1.0.0-SNAPSHOT.jar"); mainJar = main; @@ -78,6 +78,7 @@ public class SparkArgsUtilsTest { param.setProgramType(programType); param.setSparkVersion(sparkVersion); param.setMainArgs(mainArgs); + param.setAppName(appName); param.setQueue(queue); //Invoke buildArgs @@ -87,42 +88,46 @@ public class SparkArgsUtilsTest { } //Expected values and order - assertEquals(result.size(),20); + assertEquals(22, result.size()); + + assertEquals("--master", result.get(0)); + assertEquals("yarn", result.get(1)); + + assertEquals("--deploy-mode", result.get(2)); + assertEquals(mode, result.get(3)); - assertEquals(result.get(0),"--master"); - assertEquals(result.get(1),"yarn"); + assertEquals("--class", result.get(4)); + assertEquals(mainClass, result.get(5)); - assertEquals(result.get(2),"--deploy-mode"); - assertEquals(result.get(3),mode); + assertEquals("--driver-cores", result.get(6)); + assertSame(driverCores, Integer.valueOf(result.get(7))); - assertEquals(result.get(4),"--class"); - assertEquals(result.get(5),mainClass); + assertEquals("--driver-memory", result.get(8)); + assertEquals(driverMemory, result.get(9)); - assertEquals(result.get(6),"--driver-cores"); - assertSame(Integer.valueOf(result.get(7)),driverCores); + assertEquals("--num-executors", result.get(10)); + assertSame(numExecutors, Integer.valueOf(result.get(11))); - assertEquals(result.get(8),"--driver-memory"); - assertEquals(result.get(9),driverMemory); + assertEquals("--executor-cores", result.get(12)); + assertSame(executorCores, Integer.valueOf(result.get(13))); - assertEquals(result.get(10),"--num-executors"); - assertSame(Integer.valueOf(result.get(11)),numExecutors); + assertEquals("--executor-memory", result.get(14)); + assertEquals(executorMemory, result.get(15)); - assertEquals(result.get(12),"--executor-cores"); - assertSame(Integer.valueOf(result.get(13)),executorCores); + assertEquals("--name", result.get(16)); + assertEquals(ArgsUtils.escape(appName), result.get(17)); - assertEquals(result.get(14),"--executor-memory"); - assertEquals(result.get(15),executorMemory); + assertEquals("--queue", result.get(18)); + assertEquals(queue, result.get(19)); - assertEquals(result.get(16),"--queue"); - assertEquals(result.get(17),queue); - assertEquals(result.get(18),mainJar.getRes()); - assertEquals(result.get(19),mainArgs); + assertEquals(mainJar.getRes(), result.get(20)); + assertEquals(mainArgs, result.get(21)); //Others param without --queue SparkParameters param1 = new SparkParameters(); param1.setOthers("--files xxx/hive-site.xml"); param1.setQueue(queue); result = SparkArgsUtils.buildArgs(param1); - assertEquals(result.size(),7); + assertEquals(7, result.size()); } } \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index a2192399a5..5fbc767e35 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -80,6 +80,18 @@ + +
{{$t('App Name')}}
+
+ + +
+
{{$t('Driver Cores')}}
@@ -223,6 +235,8 @@ executorMemory: '2G', // Executor cores executorCores: 2, + // Spark app name + appName: '', // Main arguments mainArgs: '', // Option parameters @@ -448,6 +462,7 @@ numExecutors: this.numExecutors, executorMemory: this.executorMemory, executorCores: this.executorCores, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType, @@ -512,6 +527,7 @@ numExecutors: this.numExecutors, executorMemory: this.executorMemory, executorCores: this.executorCores, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType, @@ -544,6 +560,7 @@ this.numExecutors = o.params.numExecutors || 2 this.executorMemory = o.params.executorMemory || '2G' this.executorCores = o.params.executorCores || 2 + this.appName = o.params.appName || '' this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'SCALA'