Browse Source

[Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)

select different versions of spark-sql to execute different versions of spark-sql
3.2.0-release
limaiwang 2 years ago committed by GitHub
parent
commit
25b78a8003
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
  2. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  3. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
  4. 97
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  5. 2
      dolphinscheduler-worker/src/main/bin/start.sh

65
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkCommand {
/**
* 0 SPARK1SUBMIT
* 1 SPARK2SUBMIT
* 2 SPARK1SQL
* 3 SPARK2SQL
*/
SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1),
SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2),
SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1),
SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2);
private final int code;
private final String descp;
/**
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
private final SparkVersion sparkVersion;
SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) {
this.code = code;
this.descp = descp;
this.command = command;
this.sparkVersion = sparkVersion;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public String getCommand() {
return command;
}
public SparkVersion getSparkVersion() {
return sparkVersion;
}
}

13
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -42,7 +41,6 @@ import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -100,16 +98,19 @@ public class SparkTask extends AbstractYarnTask {
List<String> args = new ArrayList<>();
// spark version
String sparkCommand = SparkVersion.SPARK2.getCommand();
String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand();
// If the programType is non-SQL, execute bin/spark-submit
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkVersion.SPARK1.getCommand();
if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand();
}
// If the programType is SQL, execute bin/spark-sql
if (sparkParameters.getProgramType() == ProgramType.SQL) {
sparkCommand = SparkVersion.SPARKSQL.getCommand();
sparkCommand = SparkCommand.SPARK2SQL.getCommand();
if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SQL.getCommand();
}
}
args.add(sparkCommand);

35
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java

@ -18,38 +18,5 @@
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkVersion {
/**
* 0 SPARK1
* 1 SPARK2
* 2 SPARKSQL
*/
SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
private final int code;
private final String descp;
/**
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
SparkVersion(int code, String descp, String command) {
this.code = code;
this.descp = descp;
this.command = command;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public String getCommand() {
return command;
}
SPARK1, SPARK2
}

97
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.spark;
import java.util.Collections;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.Assert;
@ -42,8 +43,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class SparkTaskTest {
@Test
public void testBuildCommandWithSparkSql() throws Exception {
String parameters = buildSparkParametersWithSparkSql();
public void testBuildCommandWithSpark2Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@ -63,22 +64,108 @@ public class SparkTaskTest {
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
@Test
public void testBuildCommandWithSpark1Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
private String buildSparkParametersWithSparkSql() {
@Test
public void testBuildCommandWithSpark2Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME2}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
@Test
public void testBuildCommandWithSpark1Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;");
sparkParameters.setProgramType(ProgramType.SQL);
sparkParameters.setProgramType(programType);
sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion("SPARK2");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
return JSONUtils.toJsonString(sparkParameters);
}
private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setProgramType(programType);
sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("spark");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setRes("dolphinscheduler-task-spark.jar");
resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
sparkParameters.setMainJar(resourceInfo);
return JSONUtils.toJsonString(sparkParameters);
}

2
dolphinscheduler-worker/src/main/bin/start.sh

@ -21,7 +21,7 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
chmod -R 700 ${DOLPHINSCHEDULER_HOME}/config
chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf
export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

Loading…
Cancel
Save