Browse Source

cherry-pick [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute /bin/spark-sql (#11971)

3.0.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
2e64e1ca2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
  2. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  3. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
  4. 101
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  5. 2
      dolphinscheduler-worker/src/main/bin/start.sh

65
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkCommand {
/**
* 0 SPARK1SUBMIT
* 1 SPARK2SUBMIT
* 2 SPARK1SQL
* 3 SPARK2SQL
*/
SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1),
SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2),
SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1),
SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2);
private final int code;
private final String descp;
/**
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
private final SparkVersion sparkVersion;
SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) {
this.code = code;
this.descp = descp;
this.command = command;
this.sparkVersion = sparkVersion;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public String getCommand() {
return command;
}
public SparkVersion getSparkVersion() {
return sparkVersion;
}
}

17
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -40,11 +40,7 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
public class SparkTask extends AbstractYarnTask {
@ -99,16 +95,19 @@ public class SparkTask extends AbstractYarnTask {
List<String> args = new ArrayList<>();
// spark version
String sparkCommand = SparkVersion.SPARK2.getCommand();
String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand();
// If the programType is non-SQL, execute bin/spark-submit
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkVersion.SPARK1.getCommand();
if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand();
}
// If the programType is SQL, execute bin/spark-sql
if (sparkParameters.getProgramType() == ProgramType.SQL) {
sparkCommand = SparkVersion.SPARKSQL.getCommand();
sparkCommand = SparkCommand.SPARK2SQL.getCommand();
if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkCommand.SPARK1SQL.getCommand();
}
}
args.add(sparkCommand);

35
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java

@ -18,38 +18,5 @@
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkVersion {
/**
* 0 SPARK1
* 1 SPARK2
* 2 SPARKSQL
*/
SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
private final int code;
private final String descp;
/**
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
SparkVersion(int code, String descp, String command) {
this.code = code;
this.descp = descp;
this.command = command;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public String getCommand() {
return command;
}
SPARK1, SPARK2
}

101
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -22,6 +22,7 @@ import java.util.Collections;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.Assert;
@ -44,8 +45,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class SparkTaskTest {
@Test
public void testBuildCommandWithSparkSql() throws Exception {
String parameters = buildSparkParametersWithSparkSql();
public void testBuildCommandWithSpark2Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@ -65,22 +66,108 @@ public class SparkTaskTest {
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
@Test
public void testBuildCommandWithSpark1Sql() throws Exception {
String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
private String buildSparkParametersWithSparkSql() {
@Test
public void testBuildCommandWithSpark2Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME2}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
@Test
public void testBuildCommandWithSpark1Submit() throws Exception {
String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
sparkTask.init();
Assert.assertEquals(sparkTask.buildCommand(),
"${SPARK_HOME1}/bin/spark-submit " +
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}
private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;");
sparkParameters.setProgramType(ProgramType.SQL);
sparkParameters.setMainClass(StringUtils.EMPTY);
sparkParameters.setProgramType(programType);
sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql");
sparkParameters.setOthers(StringUtils.EMPTY);
sparkParameters.setSparkVersion("SPARK2");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
return JSONUtils.toJsonString(sparkParameters);
}
private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setProgramType(programType);
sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("spark");
sparkParameters.setOthers("");
sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setRes("dolphinscheduler-task-spark.jar");
resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
sparkParameters.setMainJar(resourceInfo);
return JSONUtils.toJsonString(sparkParameters);
}

2
dolphinscheduler-worker/src/main/bin/start.sh

@ -21,7 +21,7 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
chmod -R 700 ${DOLPHINSCHEDULER_HOME}/config
chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf
export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}

Loading…
Cancel
Save