Browse Source

[1.3.6-prepare][Feature-4960][Spark] Add name input for Spark and improve Spark & MR args #4959 (#5017)

Shiwen Cheng 4 years ago committed by GitHub
parent
commit
e5dcbad66d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
  2. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 369
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
  4. 62
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  5. 38
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  6. 195
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  7. 24
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  8. 65
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
  9. 17
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.MessageFormat;

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -594,6 +594,10 @@ public final class Constants {
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
/**
* --name NAME
*/
public static final String SPARK_NAME = "--name";
/**
* --queue QUEUE

369
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@ -29,203 +29,214 @@ import java.util.List;
*/
public class SparkParameters extends AbstractParameters {
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* driver-cores Number of cores used by the driver, only in cluster mode
*/
private int driverCores;
/**
* driver-memory Memory for driver
*/
private String driverMemory;
/**
* num-executors Number of executors to launch
*/
private int numExecutors;
/**
* executor-cores Number of cores per executor
*/
private int executorCores;
/**
* Memory per executor
*/
private String executorMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getDriverCores() {
return driverCores;
}
public void setDriverCores(int driverCores) {
this.driverCores = driverCores;
}
public String getDriverMemory() {
return driverMemory;
}
public void setDriverMemory(String driverMemory) {
this.driverMemory = driverMemory;
}
/**
* main jar
*/
private ResourceInfo mainJar;
/**
* main class
*/
private String mainClass;
/**
* deploy mode
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* driver-cores Number of cores used by the driver, only in cluster mode
*/
private int driverCores;
/**
* driver-memory Memory for driver
*/
private String driverMemory;
/**
* num-executors Number of executors to launch
*/
private int numExecutors;
/**
* executor-cores Number of cores per executor
*/
private int executorCores;
/**
* Memory per executor
*/
private String executorMemory;
/**
* app name
*/
private String appName;
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
/**
* spark version
*/
private String sparkVersion;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getDriverCores() {
return driverCores;
}
public void setDriverCores(int driverCores) {
this.driverCores = driverCores;
}
public String getDriverMemory() {
return driverMemory;
}
public void setDriverMemory(String driverMemory) {
this.driverMemory = driverMemory;
}
public int getNumExecutors() {
return numExecutors;
}
public int getNumExecutors() {
return numExecutors;
}
public void setNumExecutors(int numExecutors) {
this.numExecutors = numExecutors;
}
public void setNumExecutors(int numExecutors) {
this.numExecutors = numExecutors;
}
public int getExecutorCores() {
return executorCores;
}
public void setExecutorCores(int executorCores) {
this.executorCores = executorCores;
}
public int getExecutorCores() {
return executorCores;
}
public String getExecutorMemory() {
return executorMemory;
}
public void setExecutorCores(int executorCores) {
this.executorCores = executorCores;
}
public void setExecutorMemory(String executorMemory) {
this.executorMemory = executorMemory;
}
public String getExecutorMemory() {
return executorMemory;
}
public void setExecutorMemory(String executorMemory) {
this.executorMemory = executorMemory;
}
public String getQueue() {
return queue;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getQueue() {
return queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
public void setOthers(String others) {
this.others = others;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public ProgramType getProgramType() {
return programType;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public ProgramType getProgramType() {
return programType;
}
public String getSparkVersion() {
return sparkVersion;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
public String getSparkVersion() {
return sparkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
public void setSparkVersion(String sparkVersion) {
this.sparkVersion = sparkVersion;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
return resourceList;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return resourceList;
}
}

62
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* spark args utils
* spark args utils
*/
public class SparkArgsUtils {
private static final String SPARK_CLUSTER = "cluster";
private static final String SPARK_LOCAL = "local";
private static final String SPARK_ON_YARN = "yarn";
/**
* build args
*
@ -40,29 +45,24 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
args.add(Constants.MASTER);
if(StringUtils.isNotEmpty(param.getDeployMode())){
deployMode = param.getDeployMode();
}
if(!"local".equals(deployMode)){
args.add("yarn");
String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
args.add(param.getDeployMode());
args.add(deployMode);
ProgramType type = param.getProgramType();
String mainClass = param.getMainClass();
if(type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)){
if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}
int driverCores = param.getDriverCores();
if (driverCores != 0) {
if (driverCores > 0) {
args.add(Constants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
}
@ -74,13 +74,13 @@ public class SparkArgsUtils {
}
int numExecutors = param.getNumExecutors();
if (numExecutors != 0) {
if (numExecutors > 0) {
args.add(Constants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
}
int executorCores = param.getExecutorCores();
if (executorCores != 0) {
if (executorCores > 0) {
args.add(Constants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
}
@ -91,22 +91,26 @@ public class SparkArgsUtils {
args.add(executorMemory);
}
// --files --conf --libjar ...
String others = param.getOthers();
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(Constants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
if(!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)){
args.add(Constants.SPARK_QUEUE);
args.add(queue);
String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode)) {
if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}else if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
ResourceInfo mainJar = param.getMainJar();

38
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -26,21 +26,27 @@ import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* mapreduce task
*/
public class MapReduceTask extends AbstractYarnTask {
/**
* map reduce command
* usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
*/
private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@ -85,10 +90,10 @@ public class MapReduceTask extends AbstractYarnTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null){
if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
if(mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON){
if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() throws Exception {
List<String> parameterList = buildParameters(mapreduceParameters);
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAP_REDUCE_COMMAND);
args.addAll(buildParameters(mapreduceParameters));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
logger.info("mapreduce task command: {}", command);
@ -143,21 +152,18 @@ public class MapReduceTask extends AbstractYarnTask {
* @param mapreduceParameters mapreduce parameters
* @return parameter list
*/
private List<String> buildParameters(MapreduceParameters mapreduceParameters){
private List<String> buildParameters(MapreduceParameters mapreduceParameters) {
List<String> result = new ArrayList<>();
result.add(Constants.HADOOP);
// main jar
if(mapreduceParameters.getMainJar()!= null){
if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);
result.add(mapreduceParameters.getMainJar().getRes());
}
// main class
if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){
if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) {
result.add(mapreduceParameters.getMainClass());
}
@ -170,13 +176,13 @@ public class MapReduceTask extends AbstractYarnTask {
}
result.add(mapreduceParameters.getOthers());
}else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
} else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
// command args
if(StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())){
if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
result.add(mapreduceParameters.getMainArgs());
}
return result;

195
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -24,131 +25,137 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* spark task
*/
public class SparkTask extends AbstractYarnTask {
/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark1 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
/**
* spark parameters
*/
private SparkParameters sparkParameters;
/**
* taskExecutionContext
*/
private final TaskExecutionContext sparkTaskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.sparkTaskExecutionContext = taskExecutionContext;
}
/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
@Override
public void init() {
/**
* spark parameters
*/
private SparkParameters sparkParameters;
logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams());
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class);
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
if (null == sparkParameters) {
logger.error("Spark params is null");
return;
}
@Override
public void init() {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
setMainJarName();
}
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
/**
* create command
*
* @return command
*/
@Override
protected String buildCommand() {
// spark-submit [options] <app jar | python file> [app arguments]
List<String> args = new ArrayList<>();
//spark version
String sparkCommand = SPARK2_COMMAND;
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
args.add(sparkCommand);
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
setMainJarName();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
sparkTaskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
sparkTaskExecutionContext.getScheduleTime());
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
String command = null;
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (null != paramsMap) {
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
}
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}
}
logger.info("spark task command: {}", command);
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
List<String> args = new ArrayList<>();
return command;
}
//spark version
String sparkCommand = SPARK2_COMMAND;
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}
if (null == mainJar) {
throw new RuntimeException("Spark task jar params is null");
}
args.add(sparkCommand);
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("spark task command : {}", command);
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;
}
}

24
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest {
assertEquals("yarn-cluster", result.get(1));
assertEquals("-ys", result.get(2));
assertSame(Integer.valueOf(result.get(3)),slot);
assertSame(slot, Integer.valueOf(result.get(3)));
assertEquals("-ynm",result.get(4));
assertEquals(result.get(5),appName);
assertEquals("-ynm", result.get(4));
assertEquals(appName, result.get(5));
assertEquals("-yn", result.get(6));
assertSame(Integer.valueOf(result.get(7)),taskManager);
assertSame(taskManager, Integer.valueOf(result.get(7)));
assertEquals("-yjm", result.get(8));
assertEquals(result.get(9),jobManagerMemory);
assertEquals(jobManagerMemory, result.get(9));
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
assertEquals(taskManagerMemory, result.get(11));
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals(queue, result.get(13));
assertEquals("-p", result.get(14));
assertSame(Integer.valueOf(result.get(15)),parallelism);
assertSame(parallelism, Integer.valueOf(result.get(15)));
assertEquals("-sae", result.get(16));
assertEquals(result.get(17),others);
assertEquals(others, result.get(17));
assertEquals("-c", result.get(18));
assertEquals(result.get(19),mainClass);
assertEquals(mainClass, result.get(19));
assertEquals(result.get(20),mainJar.getRes());
assertEquals(result.get(21),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();

65
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test SparkArgsUtils
*/
@ -48,12 +49,11 @@ public class SparkArgsUtilsTest {
public int executorCores = 6;
public String sparkVersion = "SPARK1";
public int numExecutors = 4;
public String appName = "spark test";
public String queue = "queue1";
@Before
public void setUp() throws Exception {
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
@ -78,6 +78,7 @@ public class SparkArgsUtilsTest {
param.setProgramType(programType);
param.setSparkVersion(sparkVersion);
param.setMainArgs(mainArgs);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
@ -87,42 +88,46 @@ public class SparkArgsUtilsTest {
}
//Expected values and order
assertEquals(result.size(),20);
assertEquals(22, result.size());
assertEquals("--master", result.get(0));
assertEquals("yarn", result.get(1));
assertEquals("--deploy-mode", result.get(2));
assertEquals(mode, result.get(3));
assertEquals(result.get(0),"--master");
assertEquals(result.get(1),"yarn");
assertEquals("--class", result.get(4));
assertEquals(mainClass, result.get(5));
assertEquals(result.get(2),"--deploy-mode");
assertEquals(result.get(3),mode);
assertEquals("--driver-cores", result.get(6));
assertSame(driverCores, Integer.valueOf(result.get(7)));
assertEquals(result.get(4),"--class");
assertEquals(result.get(5),mainClass);
assertEquals("--driver-memory", result.get(8));
assertEquals(driverMemory, result.get(9));
assertEquals(result.get(6),"--driver-cores");
assertSame(Integer.valueOf(result.get(7)),driverCores);
assertEquals("--num-executors", result.get(10));
assertSame(numExecutors, Integer.valueOf(result.get(11)));
assertEquals(result.get(8),"--driver-memory");
assertEquals(result.get(9),driverMemory);
assertEquals("--executor-cores", result.get(12));
assertSame(executorCores, Integer.valueOf(result.get(13)));
assertEquals(result.get(10),"--num-executors");
assertSame(Integer.valueOf(result.get(11)),numExecutors);
assertEquals("--executor-memory", result.get(14));
assertEquals(executorMemory, result.get(15));
assertEquals(result.get(12),"--executor-cores");
assertSame(Integer.valueOf(result.get(13)),executorCores);
assertEquals("--name", result.get(16));
assertEquals(ArgsUtils.escape(appName), result.get(17));
assertEquals(result.get(14),"--executor-memory");
assertEquals(result.get(15),executorMemory);
assertEquals("--queue", result.get(18));
assertEquals(queue, result.get(19));
assertEquals(result.get(16),"--queue");
assertEquals(result.get(17),queue);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without --queue
SparkParameters param1 = new SparkParameters();
param1.setOthers("--files xxx/hive-site.xml");
param1.setQueue(queue);
result = SparkArgsUtils.buildArgs(param1);
assertEquals(result.size(),7);
assertEquals(7, result.size());
}
}

17
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -79,6 +79,18 @@
</x-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="input"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<div class="list-box-4p">
<div class="clearfix list">
<span class="sp1">{{$t('Driver Cores')}}</span>
@ -241,6 +253,8 @@
executorMemory: '2G',
// Executor cores
executorCores: 2,
// Spark app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -475,6 +489,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -534,6 +549,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -575,6 +591,7 @@
this.numExecutors = o.params.numExecutors || 2
this.executorMemory = o.params.executorMemory || '2G'
this.executorCores = o.params.executorCores || 2
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'

Loading…
Cancel
Save