Browse Source

[1.3.6-prepare][Feature-5028][MR] Support mapreduce name #5029 (#5030)

Shiwen Cheng 4 years ago committed by GitHub
parent
commit
2c6b40cbd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  2. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 402
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  4. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java
  5. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  6. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  7. 85
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
  8. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  9. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  10. 189
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  11. 65
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  12. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  13. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  14. 95
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
  15. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
  16. 17
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  17. 1
      pom.xml

21
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -17,18 +17,20 @@
package org.apache.dolphinscheduler.api.utils;
import org.apache.commons.lang.StringUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -36,18 +38,15 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import static org.junit.Assert.*;
public class CheckUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(CheckUtilsTest.class);
@ -171,8 +170,8 @@ public class CheckUtilsTest {
sqlParameters.setSql("yy");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString()));
// MapreduceParameters
MapreduceParameters mapreduceParameters = new MapreduceParameters();
// MapReduceParameters
MapReduceParameters mapreduceParameters = new MapReduceParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
ResourceInfo resourceInfoMapreduce = new ResourceInfo();

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -533,9 +533,6 @@ public final class Constants {
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
/**
* hadoop params constant
*/
/**
* jar
*/
@ -547,12 +544,17 @@ public final class Constants {
public static final String HADOOP = "hadoop";
/**
* -D parameter
* -D <property>=<value>
*/
public static final String D = "-D";
/**
* -D mapreduce.job.queuename=ququename
* -D mapreduce.job.name=name
*/
public static final String MR_NAME = "mapreduce.job.name";
/**
* -D mapreduce.job.queuename=queuename
*/
public static final String MR_QUEUE = "mapreduce.job.queuename";

402
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -29,213 +29,213 @@ import java.util.List;
*/
public class FlinkParameters extends AbstractParameters {
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode yarn-cluster yarn-local
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* slot count
*/
private int slot;
/**
* parallelism
*/
private int parallelism;
/**
* yarn application name
*/
private String appName;
/**
* taskManager count
*/
private int taskManager;
/**
* job manager memory
*/
private String jobManagerMemory;
/**
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getSlot() {
return slot;
}
public void setSlot(int slot) {
this.slot = slot;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public int getTaskManager() {
return taskManager;
}
public void setTaskManager(int taskManager) {
this.taskManager = taskManager;
}
public String getJobManagerMemory() {
return jobManagerMemory;
}
public void setJobManagerMemory(String jobManagerMemory) {
this.jobManagerMemory = jobManagerMemory;
}
public String getTaskManagerMemory() {
return taskManagerMemory;
}
public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
/**
* major jar
*/
private ResourceInfo mainJar;
/**
* major class
*/
private String mainClass;
/**
* deploy mode yarn-cluster yarn-local
*/
private String deployMode;
/**
* arguments
*/
private String mainArgs;
/**
* slot count
*/
private int slot;
/**
* parallelism
*/
private int parallelism;
/**
* yarn application name
*/
private String appName;
/**
* taskManager count
*/
private int taskManager;
/**
* job manager memory
*/
private String jobManagerMemory;
/**
* task manager memory
*/
private String taskManagerMemory;
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
/**
* The YARN queue to submit to
*/
private String queue;
/**
* other arguments
*/
private String others;
/**
* flink version
*/
private String flinkVersion;
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
*/
private ProgramType programType;
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getSlot() {
return slot;
}
public void setSlot(int slot) {
this.slot = slot;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public int getTaskManager() {
return taskManager;
}
public void setTaskManager(int taskManager) {
this.taskManager = taskManager;
}
public String getJobManagerMemory() {
return jobManagerMemory;
}
public void setJobManagerMemory(String jobManagerMemory) {
this.jobManagerMemory = jobManagerMemory;
}
public String getTaskManagerMemory() {
return taskManagerMemory;
}
public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
public void setOthers(String others) {
this.others = others;
}
public ProgramType getProgramType() {
return programType;
}
public ProgramType getProgramType() {
return programType;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public String getFlinkVersion() {
return flinkVersion;
}
public String getFlinkVersion() {
return flinkVersion;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
public void setFlinkVersion(String flinkVersion) {
this.flinkVersion = flinkVersion;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
@Override
public List<ResourceInfo> getResourceFilesList() {
if (mainJar != null && !resourceList.contains(mainJar)) {
resourceList.add(mainJar);
}
return resourceList;
}
return resourceList;
}
}

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java

@ -14,17 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.mr;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public class MapreduceParameters extends AbstractParameters {
/**
* mapreduce parameters
*/
public class MapReduceParameters extends AbstractParameters {
/**
* major jar
@ -46,6 +49,11 @@ public class MapreduceParameters extends AbstractParameters {
*/
private String others;
/**
* app name
*/
private String appName;
/**
* queue
*/
@ -87,6 +95,14 @@ public class MapreduceParameters extends AbstractParameters {
this.others = others;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getQueue() {
return queue;
}

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -60,7 +60,7 @@ public class TaskParametersUtils {
case SQL:
return JSONUtils.parseObject(parameter, SqlParameters.class);
case MR:
return JSONUtils.parseObject(parameter, MapreduceParameters.class);
return JSONUtils.parseObject(parameter, MapReduceParameters.class);
case SPARK:
return JSONUtils.parseObject(parameter, SparkParameters.class);
case PYTHON:

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -30,9 +30,15 @@ import java.util.List;
* flink args utils
*/
public class FlinkArgsUtils {
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
* @param param flink parameters

85
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* mapreduce args utils
*/
public class MapReduceArgsUtils {
private MapReduceArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
* @param param param
* @return argument list
*/
public static List<String> buildArgs(MapReduceParameters param) {
List<String> args = new ArrayList<>();
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(Constants.JAR);
args.add(mainJar.getRes());
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(mainClass);
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName)));
}
String others = param.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue));
}
}
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -37,6 +37,10 @@ public class SparkArgsUtils {
private static final String SPARK_ON_YARN = "yarn";
private SparkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
@ -54,9 +58,9 @@ public class SparkArgsUtils {
}
args.add(deployMode);
ProgramType type = param.getProgramType();
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -16,7 +16,8 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.lang.StringUtils;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -27,7 +28,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -38,12 +39,13 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.slf4j.Logger;
import org.apache.commons.lang.StringUtils;
import java.util.List;
import java.util.Map;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.slf4j.Logger;
/**
* executive task
@ -222,7 +224,7 @@ public abstract class AbstractTask {
paramsClass = ProcedureParameters.class;
break;
case MR:
paramsClass = MapreduceParameters.class;
paramsClass = MapReduceParameters.class;
break;
case SPARK:
paramsClass = SparkParameters.class;

189
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -21,129 +22,127 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* flink task
*/
public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
*/
private FlinkParameters flinkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
*/
private FlinkParameters flinkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
@Override
public void init() {
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
if (!flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
}
flinkParameters.setMainArgs(args);
}
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("flink task command : {}", command);
logger.info("param Map : {}", paramsMap);
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
}
flinkParameters.setMainArgs(args);
return command;
}
}
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
List<String> args = new ArrayList<>();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("flink task command : {}", command);
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;
}
}

65
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants;
@ -22,12 +23,12 @@ import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@ -43,15 +44,15 @@ import org.slf4j.Logger;
public class MapReduceTask extends AbstractYarnTask {
/**
* map reduce command
* mapreduce command
* usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
*/
private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
private static final String MAPREDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
*/
private MapreduceParameters mapreduceParameters;
private MapReduceParameters mapreduceParameters;
/**
* taskExecutionContext
@ -73,10 +74,10 @@ public class MapReduceTask extends AbstractYarnTask {
logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams());
this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapreduceParameters.class);
this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class);
// check parameters
if (!mapreduceParameters.checkParameters()) {
if (mapreduceParameters == null || !mapreduceParameters.checkParameters()) {
throw new RuntimeException("mapreduce task params is not valid");
}
@ -103,15 +104,15 @@ public class MapReduceTask extends AbstractYarnTask {
/**
* build command
* @return command
* @throws Exception exception
*/
@Override
protected String buildCommand() throws Exception {
protected String buildCommand() {
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAP_REDUCE_COMMAND);
args.add(MAPREDUCE_COMMAND);
args.addAll(buildParameters(mapreduceParameters));
// other parameters
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
@ -146,46 +147,4 @@ public class MapReduceTask extends AbstractYarnTask {
public AbstractParameters getParameters() {
return mapreduceParameters;
}
/**
* build parameters
* @param mapreduceParameters mapreduce parameters
* @return parameter list
*/
private List<String> buildParameters(MapreduceParameters mapreduceParameters) {
List<String> result = new ArrayList<>();
// main jar
if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);
result.add(mapreduceParameters.getMainJar().getRes());
}
// main class
if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
&& StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) {
result.add(mapreduceParameters.getMainClass());
}
// others
if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) {
String others = mapreduceParameters.getOthers();
if (!others.contains(Constants.MR_QUEUE)
&& StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
result.add(mapreduceParameters.getOthers());
} else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
// command args
if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
result.add(mapreduceParameters.getMainArgs());
}
return result;
}
}

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -62,19 +62,19 @@ public class SparkTask extends AbstractYarnTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext sparkTaskExecutionContext;
private TaskExecutionContext taskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.sparkTaskExecutionContext = taskExecutionContext;
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams());
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class);
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
if (null == sparkParameters) {
logger.error("Spark params is null");
@ -84,13 +84,12 @@ public class SparkTask extends AbstractYarnTask {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
sparkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
}
/**
* create command
*
* @return command
*/
@Override
@ -98,7 +97,7 @@ public class SparkTask extends AbstractYarnTask {
// spark-submit [options] <app jar | python file> [app arguments]
List<String> args = new ArrayList<>();
//spark version
// spark version
String sparkCommand = SPARK2_COMMAND;
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
@ -111,11 +110,11 @@ public class SparkTask extends AbstractYarnTask {
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
sparkTaskExecutionContext.getDefinedParams(),
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
sparkTaskExecutionContext.getScheduleTime());
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
String command = null;

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -53,10 +53,8 @@ public class FlinkArgsUtilsTest {
public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@Before
public void setUp() throws Exception {
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testflink-1.0.0-SNAPSHOT.jar");
mainJar = main;
@ -67,7 +65,6 @@ public class FlinkArgsUtilsTest {
*/
@Test
public void testBuildArgs() {
//Define params
FlinkParameters param = new FlinkParameters();
param.setDeployMode(mode);
@ -134,6 +131,6 @@ public class FlinkArgsUtilsTest {
param1.setDeployMode(mode);
result = FlinkArgsUtils.buildArgs(param1);
assertEquals(5, result.size());
}
}

95
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test MapReduceArgsUtils
*/
public class MapReduceArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class);
public String mainClass = "com.examples.WordCount";
public ResourceInfo mainJar = null;
public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt";
public ProgramType programType = ProgramType.JAVA;
public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false";
public String appName = "mapreduce test";
public String queue = "queue1";
@Before
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
MapReduceParameters param = new MapReduceParameters();
param.setMainClass(mainClass);
param.setMainJar(mainJar);
param.setMainArgs(mainArgs);
param.setProgramType(programType);
param.setOthers(others);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
List<String> result = MapReduceArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(7, result.size());
assertEquals("jar", result.get(0));
assertEquals(mainJar.getRes(), result.get(1));
assertEquals(mainClass, result.get(2));
assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3));
assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4));
assertEquals(others, result.get(5));
assertEquals(mainArgs, result.get(6));
//Others param without --queue
param.setOthers("-files xxx/hive-site.xml");
param.setQueue(null);
result = MapReduceArgsUtils.buildArgs(param);
assertEquals(6, result.size());
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -64,7 +64,6 @@ public class SparkArgsUtilsTest {
*/
@Test
public void testBuildArgs() {
//Define params
SparkParameters param = new SparkParameters();
param.setDeployMode(mode);
@ -130,4 +129,5 @@ public class SparkArgsUtilsTest {
result = SparkArgsUtils.buildArgs(param1);
assertEquals(7, result.size());
}
}
}

17
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -49,6 +49,18 @@
</treeselect>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('App Name')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="input"
v-model="appName"
:placeholder="$t('Please enter app name(optional)')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Main Arguments')}}</div>
<div slot="content">
@ -125,6 +137,8 @@
cacheResourceList: [],
// Custom parameter
localParams: [],
// MR app name
appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@ -293,6 +307,7 @@
return {id: v}
}),
localParams: this.localParams,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -348,6 +363,7 @@
},
resourceList: resourceIdArr,
localParams: this.localParams,
appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType
@ -384,6 +400,7 @@
} else {
this.mainJar = o.params.mainJar.id || ''
}
this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'JAVA'

1
pom.xml

@ -820,6 +820,7 @@
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/MapReduceArgsUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>

Loading…
Cancel
Save