diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java new file mode 100644 index 0000000000..11609f37f2 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.spark; + +public enum SparkCommand { + + /** + * 0 SPARK1SUBMIT + * 1 SPARK2SUBMIT + * 2 SPARK1SQL + * 3 SPARK2SQL + */ + SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1), + SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2), + + SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1), + + SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2); + + private final int code; + private final String descp; + /** + * usage: spark-submit [options] [app arguments] + */ + private final String command; + private final SparkVersion sparkVersion; + + SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) { + this.code = code; + this.descp = descp; + this.command = command; + this.sparkVersion = sparkVersion; + } + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } + + public String getCommand() { + return command; + } + + public SparkVersion getSparkVersion() { + return sparkVersion; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 01db342661..2e40ecf696 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -42,7 +41,6 @@ import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -100,16 +98,19 @@ public class SparkTask extends AbstractYarnTask { List args = new ArrayList<>(); // spark version - String sparkCommand = SparkVersion.SPARK2.getCommand(); + String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand(); // If the programType is non-SQL, execute bin/spark-submit - if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { - sparkCommand = SparkVersion.SPARK1.getCommand(); + if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand(); } // If the programType is SQL, execute bin/spark-sql if (sparkParameters.getProgramType() == ProgramType.SQL) { - sparkCommand = SparkVersion.SPARKSQL.getCommand(); + sparkCommand = SparkCommand.SPARK2SQL.getCommand(); + if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SparkCommand.SPARK1SQL.getCommand(); + } } args.add(sparkCommand); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java index 02c357914b..baafafe2e4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java @@ -18,38 +18,5 @@ package org.apache.dolphinscheduler.plugin.task.spark; public enum SparkVersion { - - /** - * 0 SPARK1 - * 1 SPARK2 - * 2 SPARKSQL - */ - SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"), - SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"), - SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql"); - - private final int code; - private final String descp; - /** - * usage: spark-submit [options] [app arguments] - */ - private final String command; - - SparkVersion(int code, String descp, String command) { - this.code = code; - this.descp = descp; - this.command = command; - } - - public int getCode() { - return code; - } - - public String getDescp() { - return descp; - } - - public String getCommand() { - return command; - } + SPARK1, SPARK2 } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java index 9d5565ab95..4edd28fd1e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.spark; import java.util.Collections; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.junit.Assert; @@ -42,8 +43,8 @@ import static org.powermock.api.mockito.PowerMockito.when; public class SparkTaskTest { @Test - public void testBuildCommandWithSparkSql() throws Exception { - String parameters = buildSparkParametersWithSparkSql(); + public void testBuildCommandWithSpark2Sql() throws Exception { + String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2"); TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); when(taskExecutionContext.getTaskParams()).thenReturn(parameters); when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); @@ -63,22 +64,108 @@ public class SparkTaskTest { "--name sparksql " + "-f /tmp/5536_node.sql"); } + @Test + public void testBuildCommandWithSpark1Sql() throws Exception { + String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1"); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); + SparkTask sparkTask = spy(new SparkTask(taskExecutionContext)); + sparkTask.init(); + Assert.assertEquals(sparkTask.buildCommand(), + "${SPARK_HOME1}/bin/spark-sql " + + "--master yarn " + + "--deploy-mode client " + + "--driver-cores 1 " + + "--driver-memory 512M " + + "--num-executors 2 " + + "--executor-cores 2 " + + "--executor-memory 1G " + + "--name sparksql " + + "-f /tmp/5536_node.sql"); + } - private String buildSparkParametersWithSparkSql() { + @Test + public void testBuildCommandWithSpark2Submit() throws Exception { + String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2"); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); + SparkTask sparkTask = spy(new SparkTask(taskExecutionContext)); + sparkTask.init(); + Assert.assertEquals(sparkTask.buildCommand(), + "${SPARK_HOME2}/bin/spark-submit " + + "--master yarn " + + "--deploy-mode client " + + "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " + + "--driver-cores 1 " + + "--driver-memory 512M " + + "--num-executors 2 " + + "--executor-cores 2 " + + "--executor-memory 1G " + + "--name spark " + + "lib/dolphinscheduler-task-spark.jar"); + } + @Test + public void testBuildCommandWithSpark1Submit() throws Exception { + String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1"); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); + SparkTask sparkTask = spy(new SparkTask(taskExecutionContext)); + sparkTask.init(); + Assert.assertEquals(sparkTask.buildCommand(), + "${SPARK_HOME1}/bin/spark-submit " + + "--master yarn " + + "--deploy-mode client " + + "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " + + "--driver-cores 1 " + + "--driver-memory 512M " + + "--num-executors 2 " + + "--executor-cores 2 " + + "--executor-memory 1G " + + "--name spark " + + "lib/dolphinscheduler-task-spark.jar"); + } + private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) { SparkParameters sparkParameters = new SparkParameters(); sparkParameters.setLocalParams(Collections.emptyList()); sparkParameters.setRawScript("selcet 11111;"); - sparkParameters.setProgramType(ProgramType.SQL); + sparkParameters.setProgramType(programType); sparkParameters.setMainClass(""); sparkParameters.setDeployMode("client"); sparkParameters.setAppName("sparksql"); sparkParameters.setOthers(""); - sparkParameters.setSparkVersion("SPARK2"); + sparkParameters.setSparkVersion(sparkVersion); + sparkParameters.setDriverCores(1); + sparkParameters.setDriverMemory("512M"); + sparkParameters.setNumExecutors(2); + sparkParameters.setExecutorMemory("1G"); + sparkParameters.setExecutorCores(2); + return JSONUtils.toJsonString(sparkParameters); + } + private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) { + SparkParameters sparkParameters = new SparkParameters(); + sparkParameters.setLocalParams(Collections.emptyList()); + sparkParameters.setProgramType(programType); + sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest"); + sparkParameters.setDeployMode("client"); + sparkParameters.setAppName("spark"); + sparkParameters.setOthers(""); + sparkParameters.setSparkVersion(sparkVersion); sparkParameters.setDriverCores(1); sparkParameters.setDriverMemory("512M"); sparkParameters.setNumExecutors(2); sparkParameters.setExecutorMemory("1G"); sparkParameters.setExecutorCores(2); + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setId(1); + resourceInfo.setRes("dolphinscheduler-task-spark.jar"); + resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar"); + sparkParameters.setMainJar(resourceInfo); return JSONUtils.toJsonString(sparkParameters); } diff --git a/dolphinscheduler-worker/src/main/bin/start.sh b/dolphinscheduler-worker/src/main/bin/start.sh index 1a865ca6bc..56a799b500 100644 --- a/dolphinscheduler-worker/src/main/bin/start.sh +++ b/dolphinscheduler-worker/src/main/bin/start.sh @@ -21,7 +21,7 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)} source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh" -chmod -R 700 ${DOLPHINSCHEDULER_HOME}/config +chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME} JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}