Browse Source

[Feature-4960][Spark] Add name input for Spark and improve Spark & MR args (#4959)

* [Improvement][Spark] Improve spark args and add name input

* [Improvement][MR] Improve map reduce args

* [Improvement][Spark] Fix check style

* [Improvement][Spark] Fix check style for spark, flink and mr
pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
e74932cfc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 39
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
  3. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  4. 37
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  5. 24
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  6. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  7. 22
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  8. 65
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
  9. 17
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -604,6 +604,10 @@ public final class Constants {
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
/**
* --name NAME
*/
public static final String SPARK_NAME = "--name";
/**
* --queue QUEUE

39
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java

@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@ -30,12 +30,12 @@ import java.util.List;
public class SparkParameters extends AbstractParameters {
/**
* major jar
* main jar
*/
private ResourceInfo mainJar;
/**
* major class
* main class
*/
private String mainClass;
@ -76,9 +76,9 @@ public class SparkParameters extends AbstractParameters {
private String executorMemory;
/**
* resource list
* app name
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
private String appName;
/**
* The YARN queue to submit to
@ -101,6 +101,11 @@ public class SparkParameters extends AbstractParameters {
*/
private String sparkVersion;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
public ResourceInfo getMainJar() {
return mainJar;
}
@ -173,6 +178,13 @@ public class SparkParameters extends AbstractParameters {
this.executorMemory = executorMemory;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getQueue() {
return queue;
@ -182,14 +194,6 @@ public class SparkParameters extends AbstractParameters {
this.queue = queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
@ -198,6 +202,14 @@ public class SparkParameters extends AbstractParameters {
this.others = others;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public ProgramType getProgramType() {
return programType;
}
@ -227,5 +239,4 @@ public class SparkParameters extends AbstractParameters {
return resourceList;
}
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -129,5 +129,4 @@ public class FlinkArgsUtils {
return args;
}
}

37
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -45,19 +45,14 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = SPARK_CLUSTER;
args.add(Constants.MASTER);
if (StringUtils.isNotEmpty(param.getDeployMode())) {
deployMode = param.getDeployMode();
}
String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
args.add(param.getDeployMode());
args.add(deployMode);
ProgramType type = param.getProgramType();
String mainClass = param.getMainClass();
@ -67,7 +62,7 @@ public class SparkArgsUtils {
}
int driverCores = param.getDriverCores();
if (driverCores != 0) {
if (driverCores > 0) {
args.add(Constants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
}
@ -79,13 +74,13 @@ public class SparkArgsUtils {
}
int numExecutors = param.getNumExecutors();
if (numExecutors != 0) {
if (numExecutors > 0) {
args.add(Constants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
}
int executorCores = param.getExecutorCores();
if (executorCores != 0) {
if (executorCores > 0) {
args.add(Constants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
}
@ -96,22 +91,26 @@ public class SparkArgsUtils {
args.add(executorMemory);
}
// --files --conf --libjar ...
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(Constants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode)) {
if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
if (!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)) {
if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
} else if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
ResourceInfo mainJar = param.getMainJar();

24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -23,24 +23,30 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* mapreduce task
*/
public class MapReduceTask extends AbstractYarnTask {
/**
* map reduce command
* usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
*/
private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() throws Exception {
List<String> parameterList = buildParameters(mapreduceParameters);
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAP_REDUCE_COMMAND);
args.addAll(buildParameters(mapreduceParameters));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
logger.info("mapreduce task command: {}", command);
@ -144,11 +153,8 @@ public class MapReduceTask extends AbstractYarnTask {
* @return parameter list
*/
private List<String> buildParameters(MapreduceParameters mapreduceParameters) {
List<String> result = new ArrayList<>();
result.add(Constants.HADOOP);
// main jar
if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -44,11 +44,13 @@ public class SparkTask extends AbstractYarnTask {
/**
* spark1 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
@ -93,6 +95,7 @@ public class SparkTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() {
// spark-submit [options] <app jar | python file> [app arguments]
List<String> args = new ArrayList<>();
//spark version

22
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest {
assertEquals("yarn-cluster", result.get(1));
assertEquals("-ys", result.get(2));
assertSame(Integer.valueOf(result.get(3)),slot);
assertSame(slot, Integer.valueOf(result.get(3)));
assertEquals("-ynm", result.get(4));
assertEquals(result.get(5),appName);
assertEquals(appName, result.get(5));
assertEquals("-yn", result.get(6));
assertSame(Integer.valueOf(result.get(7)),taskManager);
assertSame(taskManager, Integer.valueOf(result.get(7)));
assertEquals("-yjm", result.get(8));
assertEquals(result.get(9),jobManagerMemory);
assertEquals(jobManagerMemory, result.get(9));
assertEquals("-ytm", result.get(10));
assertEquals(result.get(11),taskManagerMemory);
assertEquals(taskManagerMemory, result.get(11));
assertEquals("-yqu", result.get(12));
assertEquals(result.get(13),queue);
assertEquals(queue, result.get(13));
assertEquals("-p", result.get(14));
assertSame(Integer.valueOf(result.get(15)),parallelism);
assertSame(parallelism, Integer.valueOf(result.get(15)));
assertEquals("-sae", result.get(16));
assertEquals(result.get(17),others);
assertEquals(others, result.get(17));
assertEquals("-c", result.get(18));
assertEquals(result.get(19),mainClass);
assertEquals(mainClass, result.get(19));
assertEquals(result.get(20),mainJar.getRes());
assertEquals(result.get(21),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();

65
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Test SparkArgsUtils
*/
@ -48,12 +49,11 @@ public class SparkArgsUtilsTest {
public int executorCores = 6;
public String sparkVersion = "SPARK1";
public int numExecutors = 4;
public String appName = "spark test";
public String queue = "queue1";
@Before
public void setUp() throws Exception {
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
@ -78,6 +78,7 @@ public class SparkArgsUtilsTest {
param.setProgramType(programType);
param.setSparkVersion(sparkVersion);
param.setMainArgs(mainArgs);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
@ -87,42 +88,46 @@ public class SparkArgsUtilsTest {
}
//Expected values and order
assertEquals(result.size(),20);
assertEquals(22, result.size());
assertEquals("--master", result.get(0));
assertEquals("yarn", result.get(1));
assertEquals("--deploy-mode", result.get(2));
assertEquals(mode, result.get(3));
assertEquals(result.get(0),"--master");
assertEquals(result.get(1),"yarn");
assertEquals("--class", result.get(4));
assertEquals(mainClass, result.get(5));
assertEquals(result.get(2),"--deploy-mode");
assertEquals(result.get(3),mode);
assertEquals("--driver-cores", result.get(6));
assertSame(driverCores, Integer.valueOf(result.get(7)));
assertEquals(result.get(4),"--class");
assertEquals(result.get(5),mainClass);
assertEquals("--driver-memory", result.get(8));
assertEquals(driverMemory, result.get(9));
assertEquals(result.get(6),"--driver-cores");
assertSame(Integer.valueOf(result.get(7)),driverCores);
assertEquals("--num-executors", result.get(10));
assertSame(numExecutors, Integer.valueOf(result.get(11)));
assertEquals(result.get(8),"--driver-memory");
assertEquals(result.get(9),driverMemory);
assertEquals("--executor-cores", result.get(12));
assertSame(executorCores, Integer.valueOf(result.get(13)));
assertEquals(result.get(10),"--num-executors");
assertSame(Integer.valueOf(result.get(11)),numExecutors);
assertEquals("--executor-memory", result.get(14));
assertEquals(executorMemory, result.get(15));
assertEquals(result.get(12),"--executor-cores");
assertSame(Integer.valueOf(result.get(13)),executorCores);
assertEquals("--name", result.get(16));
assertEquals(ArgsUtils.escape(appName), result.get(17));
assertEquals(result.get(14),"--executor-memory");
assertEquals(result.get(15),executorMemory);
assertEquals("--queue", result.get(18));
assertEquals(queue, result.get(19));
assertEquals(result.get(16),"--queue");
assertEquals(result.get(17),queue);
assertEquals(result.get(18),mainJar.getRes());
assertEquals(result.get(19),mainArgs);
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without --queue
SparkParameters param1 = new SparkParameters();
param1.setOthers("--files xxx/hive-site.xml");
param1.setQueue(queue);
result = SparkArgsUtils.buildArgs(param1);
assertEquals(result.size(),7);
assertEquals(7, result.size());
}
}

17
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -80,6 +80,18 @@
</el-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<el-input
:disabled="isDetails"
type="input"
size="small"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')">
</el-input>
</div>
</m-list-box>
<m-list-4-box>
<div slot="text">{{$t('Driver Cores')}}</div>
<div slot="content">
@ -223,6 +235,8 @@
executorMemory: '2G',
// Executor cores
executorCores: 2,
// Spark app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -448,6 +462,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -512,6 +527,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@ -544,6 +560,7 @@
this.numExecutors = o.params.numExecutors || 2
this.executorMemory = o.params.executorMemory || '2G'
this.executorCores = o.params.executorCores || 2
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'

Loading…
Cancel
Save