Browse Source

[Feature][MR] Support mapreduce name (#5029)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
beb5adb985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  2. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  5. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  6. 85
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
  7. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  8. 189
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  9. 65
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  10. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  11. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  12. 95
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
  13. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
  14. 17
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  15. 1
      pom.xml

23
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -17,37 +17,36 @@
package org.apache.dolphinscheduler.api.utils;
import org.apache.commons.lang.StringUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import static org.junit.Assert.*;
public class CheckUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(CheckUtilsTest.class);
@ -171,8 +170,8 @@ public class CheckUtilsTest {
sqlParameters.setSql("yy");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString()));
// MapreduceParameters
MapreduceParameters mapreduceParameters = new MapreduceParameters();
// MapReduceParameters
MapReduceParameters mapreduceParameters = new MapReduceParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
ResourceInfo resourceInfoMapreduce = new ResourceInfo();

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -544,9 +544,7 @@ public final class Constants {
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
/**
* hadoop params
* jar
*/
public static final String JAR = "jar";
@ -557,12 +555,17 @@ public final class Constants {
public static final String HADOOP = "hadoop";
/**
* -D parameter
* -D <property>=<value>
*/
public static final String D = "-D";
/**
* -D mapreduce.job.queuename=ququename
* -D mapreduce.job.name=name
*/
public static final String MR_NAME = "mapreduce.job.name";
/**
* -D mapreduce.job.queuename=queuename
*/
public static final String MR_QUEUE = "mapreduce.job.queuename";

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java

@ -14,17 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.mr;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public class MapreduceParameters extends AbstractParameters {
/**
* mapreduce parameters
*/
public class MapReduceParameters extends AbstractParameters {
/**
* major jar
@ -46,6 +49,11 @@ public class MapreduceParameters extends AbstractParameters {
*/
private String others;
/**
* app name
*/
private String appName;
/**
* queue
*/
@ -87,6 +95,14 @@ public class MapreduceParameters extends AbstractParameters {
this.others = others;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getQueue() {
return queue;
}

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -71,7 +71,7 @@ public class TaskParametersUtils {
case SQL:
return JSONUtils.parseObject(parameter, SqlParameters.class);
case MR:
return JSONUtils.parseObject(parameter, MapreduceParameters.class);
return JSONUtils.parseObject(parameter, MapReduceParameters.class);
case SPARK:
return JSONUtils.parseObject(parameter, SparkParameters.class);
case PYTHON:

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -30,9 +30,15 @@ import java.util.List;
* flink args utils
*/
public class FlinkArgsUtils {
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
* @param param flink parameters

85
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* mapreduce args utils
*/
public class MapReduceArgsUtils {
private MapReduceArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
* @param param param
* @return argument list
*/
public static List<String> buildArgs(MapReduceParameters param) {
List<String> args = new ArrayList<>();
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(Constants.JAR);
args.add(mainJar.getRes());
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(mainClass);
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName)));
}
String others = param.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue));
}
}
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -37,6 +37,10 @@ public class SparkArgsUtils {
private static final String SPARK_ON_YARN = "yarn";
private SparkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
@ -54,9 +58,9 @@ public class SparkArgsUtils {
}
args.add(deployMode);
ProgramType type = param.getProgramType();
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}

189
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -21,129 +22,127 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* flink task
*/
public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
*/
private FlinkParameters flinkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
*/
private FlinkParameters flinkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
@Override
public void init() {
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
if (!flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
}
flinkParameters.setMainArgs(args);
}
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("flink task command : {}", command);
logger.info("param Map : {}", paramsMap);
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
}
flinkParameters.setMainArgs(args);
return command;
}
}
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("flink task command : {}", command);
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
}
}

65
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants;
@ -22,12 +23,12 @@ import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@ -43,15 +44,15 @@ import org.slf4j.Logger;
public class MapReduceTask extends AbstractYarnTask {
/**
* map reduce command
* mapreduce command
* usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
*/
private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
private static final String MAPREDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
*/
private MapreduceParameters mapreduceParameters;
private MapReduceParameters mapreduceParameters;
/**
* taskExecutionContext
@ -73,10 +74,10 @@ public class MapReduceTask extends AbstractYarnTask {
logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams());
this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapreduceParameters.class);
this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class);
// check parameters
if (!mapreduceParameters.checkParameters()) {
if (mapreduceParameters == null || !mapreduceParameters.checkParameters()) {
throw new RuntimeException("mapreduce task params is not valid");
}
@ -103,15 +104,15 @@ public class MapReduceTask extends AbstractYarnTask {
/**
* build command
* @return command
* @throws Exception exception
*/
@Override
protected String buildCommand() throws Exception {
protected String buildCommand() {
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAP_REDUCE_COMMAND);
args.add(MAPREDUCE_COMMAND);
args.addAll(buildParameters(mapreduceParameters));
// other parameters
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
@ -146,46 +147,4 @@ public class MapReduceTask extends AbstractYarnTask {
public AbstractParameters getParameters() {
return mapreduceParameters;
}
/**
* build parameters
* @param mapreduceParameters mapreduce parameters
* @return parameter list
*/
private List<String> buildParameters(MapreduceParameters mapreduceParameters) {
List<String> result = new ArrayList<>();
// main jar
if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);
result.add(mapreduceParameters.getMainJar().getRes());
}
// main class
if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) {
result.add(mapreduceParameters.getMainClass());
}
// others
if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) {
String others = mapreduceParameters.getOthers();
if (!others.contains(Constants.MR_QUEUE)
&& StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
result.add(mapreduceParameters.getOthers());
} else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
// command args
if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
result.add(mapreduceParameters.getMainArgs());
}
return result;
}
}

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -62,19 +62,19 @@ public class SparkTask extends AbstractYarnTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext sparkTaskExecutionContext;
private TaskExecutionContext taskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.sparkTaskExecutionContext = taskExecutionContext;
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams());
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class);
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
if (null == sparkParameters) {
logger.error("Spark params is null");
@ -84,13 +84,12 @@ public class SparkTask extends AbstractYarnTask {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
sparkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
}
/**
* create command
*
* @return command
*/
@Override
@ -98,7 +97,7 @@ public class SparkTask extends AbstractYarnTask {
// spark-submit [options] <app jar | python file> [app arguments]
List<String> args = new ArrayList<>();
//spark version
// spark version
String sparkCommand = SPARK2_COMMAND;
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
@ -111,11 +110,11 @@ public class SparkTask extends AbstractYarnTask {
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
sparkTaskExecutionContext.getDefinedParams(),
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
sparkTaskExecutionContext.getScheduleTime());
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
String command = null;

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -53,10 +53,8 @@ public class FlinkArgsUtilsTest {
public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@Before
public void setUp() throws Exception {
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testflink-1.0.0-SNAPSHOT.jar");
mainJar = main;
@ -67,7 +65,6 @@ public class FlinkArgsUtilsTest {
*/
@Test
public void testBuildArgs() {
//Define params
FlinkParameters param = new FlinkParameters();
param.setDeployMode(mode);
@ -134,6 +131,6 @@ public class FlinkArgsUtilsTest {
param1.setDeployMode(mode);
result = FlinkArgsUtils.buildArgs(param1);
assertEquals(5, result.size());
}
}

95
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test MapReduceArgsUtils
*/
public class MapReduceArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class);
public String mainClass = "com.examples.WordCount";
public ResourceInfo mainJar = null;
public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt";
public ProgramType programType = ProgramType.JAVA;
public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false";
public String appName = "mapreduce test";
public String queue = "queue1";
@Before
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
MapReduceParameters param = new MapReduceParameters();
param.setMainClass(mainClass);
param.setMainJar(mainJar);
param.setMainArgs(mainArgs);
param.setProgramType(programType);
param.setOthers(others);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
List<String> result = MapReduceArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(7, result.size());
assertEquals("jar", result.get(0));
assertEquals(mainJar.getRes(), result.get(1));
assertEquals(mainClass, result.get(2));
assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3));
assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4));
assertEquals(others, result.get(5));
assertEquals(mainArgs, result.get(6));
//Others param without --queue
param.setOthers("-files xxx/hive-site.xml");
param.setQueue(null);
result = MapReduceArgsUtils.buildArgs(param);
assertEquals(6, result.size());
}
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -64,7 +64,6 @@ public class SparkArgsUtilsTest {
*/
@Test
public void testBuildArgs() {
//Define params
SparkParameters param = new SparkParameters();
param.setDeployMode(mode);
@ -130,4 +129,5 @@ public class SparkArgsUtilsTest {
result = SparkArgsUtils.buildArgs(param1);
assertEquals(7, result.size());
}
}

17
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -49,6 +49,18 @@
</treeselect>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<el-input
:disabled="isDetails"
type="input"
size="small"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')">
</el-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Main Arguments')}}</div>
<div slot="content">
@ -122,6 +134,8 @@
cacheResourceList: [],
// Custom parameter
localParams: [],
// MR app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -282,6 +296,7 @@
return { id: v }
}),
localParams: this.localParams,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -342,6 +357,7 @@
},
resourceList: this.resourceIdArr,
localParams: this.localParams,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -367,6 +383,7 @@
} else {
this.mainJar = o.params.mainJar.id || ''
}
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'JAVA'

1
pom.xml

@ -936,6 +936,7 @@
<include>**/server/utils/HostTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/MapReduceArgsUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>

Loading…
Cancel
Save