diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index 50e6323e21..36d5753787 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -17,37 +17,36 @@ package org.apache.dolphinscheduler.api.utils; -import org.apache.commons.lang.StringUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.util.Map; + import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Map; - -import static org.junit.Assert.*; - public class CheckUtilsTest { private static final Logger logger = LoggerFactory.getLogger(CheckUtilsTest.class); @@ -171,8 +170,8 @@ public class CheckUtilsTest { sqlParameters.setSql("yy"); assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString())); - // MapreduceParameters - MapreduceParameters mapreduceParameters = new MapreduceParameters(); + // MapReduceParameters + MapReduceParameters mapreduceParameters = new MapReduceParameters(); assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); ResourceInfo resourceInfoMapreduce = new ResourceInfo(); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 6e5cf66495..17412575c5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -544,9 +544,7 @@ public final class Constants { public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; - /** - * hadoop params * jar */ public static final String JAR = "jar"; @@ -557,12 +555,17 @@ public final class Constants { public static final String HADOOP = "hadoop"; /** - * -D parameter + * -D = */ public static final String D = "-D"; /** - * -D mapreduce.job.queuename=ququename + * -D mapreduce.job.name=name + */ + public static final String MR_NAME = "mapreduce.job.name"; + + /** + * -D mapreduce.job.queuename=queuename */ public static final String MR_QUEUE = "mapreduce.job.queuename"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java similarity index 91% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java index 5126e82e85..762ee4275a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java @@ -14,17 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.mr; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; -public class MapreduceParameters extends AbstractParameters { +/** + * mapreduce parameters + */ +public class MapReduceParameters extends AbstractParameters { /** * major jar @@ -46,6 +49,11 @@ public class MapreduceParameters extends AbstractParameters { */ private String others; + /** + * app name + */ + private String appName; + /** * queue */ @@ -87,6 +95,14 @@ public class MapreduceParameters extends AbstractParameters { this.others = others; } + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + public String getQueue() { return queue; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index af2961e628..43654d6a0f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -71,7 +71,7 @@ public class TaskParametersUtils { case SQL: return JSONUtils.parseObject(parameter, SqlParameters.class); case MR: - return JSONUtils.parseObject(parameter, MapreduceParameters.class); + return JSONUtils.parseObject(parameter, MapReduceParameters.class); case SPARK: return JSONUtils.parseObject(parameter, SparkParameters.class); case PYTHON: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 8b3b691abc..dbd92e020f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -30,9 +30,15 @@ import java.util.List; * flink args utils */ public class FlinkArgsUtils { + private static final String LOCAL_DEPLOY_MODE = "local"; + private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + private FlinkArgsUtils() { + throw new IllegalStateException("Utility class"); + } + /** * build args * @param param flink parameters diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java new file mode 100644 index 0000000000..31e182b650 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * mapreduce args utils + */ +public class MapReduceArgsUtils { + + private MapReduceArgsUtils() { + throw new IllegalStateException("Utility class"); + } + + /** + * build args + * + * @param param param + * @return argument list + */ + public static List buildArgs(MapReduceParameters param) { + List args = new ArrayList<>(); + + ResourceInfo mainJar = param.getMainJar(); + if (mainJar != null) { + args.add(Constants.JAR); + args.add(mainJar.getRes()); + } + + ProgramType programType = param.getProgramType(); + String mainClass = param.getMainClass(); + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + args.add(mainClass); + } + + String appName = param.getAppName(); + if (StringUtils.isNotEmpty(appName)) { + args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName))); + } + + String others = param.getOthers(); + if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) { + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue)); + } + } + + // -conf -archives -files -libjars -D + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + + String mainArgs = param.getMainArgs(); + if (StringUtils.isNotEmpty(mainArgs)) { + args.add(mainArgs); + } + + return args; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java index 76828f346d..4d0fb2a5b6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java @@ -37,6 +37,10 @@ public class SparkArgsUtils { private static final String SPARK_ON_YARN = "yarn"; + private SparkArgsUtils() { + throw new IllegalStateException("Utility class"); + } + /** * build args * @@ -54,9 +58,9 @@ public class SparkArgsUtils { } args.add(deployMode); - ProgramType type = param.getProgramType(); + ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); - if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { args.add(Constants.MAIN_CLASS); args.add(mainClass); } @@ -126,4 +130,4 @@ public class SparkArgsUtils { return args; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 9de28e3e27..4d34190052 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.flink; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -21,129 +22,127 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.slf4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.slf4j.Logger; + /** * flink task */ public class FlinkTask extends AbstractYarnTask { - /** - * flink command - * usage: flink run [OPTIONS] - */ - private static final String FLINK_COMMAND = "flink"; - private static final String FLINK_RUN = "run"; - - /** - * flink parameters - */ - private FlinkParameters flinkParameters; - - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; - - public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - } + /** + * flink command + * usage: flink run [OPTIONS] + */ + private static final String FLINK_COMMAND = "flink"; + private static final String FLINK_RUN = "run"; + + /** + * flink parameters + */ + private FlinkParameters flinkParameters; + + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + + public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.taskExecutionContext = taskExecutionContext; + } - @Override - public void init() { + @Override + public void init() { - logger.info("flink task params {}", taskExecutionContext.getTaskParams()); + logger.info("flink task params {}", taskExecutionContext.getTaskParams()); - flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); + flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); - if (!flinkParameters.checkParameters()) { - throw new RuntimeException("flink task params is not valid"); + if (flinkParameters == null || !flinkParameters.checkParameters()) { + throw new RuntimeException("flink task params is not valid"); + } + flinkParameters.setQueue(taskExecutionContext.getQueue()); + setMainJarName(); + + if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { + String args = flinkParameters.getMainArgs(); + + // replace placeholder + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + flinkParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + + logger.info("param Map : {}", paramsMap); + if (paramsMap != null) { + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + logger.info("param args : {}", args); + } + flinkParameters.setMainArgs(args); + } } - flinkParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); + /** + * create command + * @return command + */ + @Override + protected String buildCommand() { + // flink run [OPTIONS] + List args = new ArrayList<>(); - if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { - String args = flinkParameters.getMainArgs(); + args.add(FLINK_COMMAND); + args.add(FLINK_RUN); + logger.info("flink task args : {}", args); + // other parameters + args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); + String command = ParameterUtils + .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - flinkParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + logger.info("flink task command : {}", command); - logger.info("param Map : {}", paramsMap); - if (paramsMap != null ){ - - args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); - logger.info("param args : {}", args); - } - flinkParameters.setMainArgs(args); + return command; } - } - - /** - * create command - * @return command - */ - @Override - protected String buildCommand() { - // flink run [OPTIONS] - List args = new ArrayList<>(); - - args.add(FLINK_COMMAND); - args.add(FLINK_RUN); - logger.info("flink task args : {}", args); - // other parameters - args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); - - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - - logger.info("flink task command : {}", command); - - return command; - } - - @Override - protected void setMainJarName() { - // main jar - ResourceInfo mainJar = flinkParameters.getMainJar(); - if (mainJar != null) { - int resourceId = mainJar.getId(); - String resourceName; - if (resourceId == 0) { - resourceName = mainJar.getRes(); - } else { - Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId()); - if (resource == null) { - logger.error("resource id: {} not exist", resourceId); - throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = flinkParameters.getMainJar(); + if (mainJar != null) { + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + flinkParameters.setMainJar(mainJar); } - resourceName = resource.getFullName().replaceFirst("/", ""); - } - mainJar.setRes(resourceName); - flinkParameters.setMainJar(mainJar); } - } - @Override - public AbstractParameters getParameters() { - return flinkParameters; - } + @Override + public AbstractParameters getParameters() { + return flinkParameters; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 43a7079dcf..f60b1cb426 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.mr; import org.apache.dolphinscheduler.common.Constants; @@ -22,12 +23,12 @@ import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; @@ -43,15 +44,15 @@ import org.slf4j.Logger; public class MapReduceTask extends AbstractYarnTask { /** - * map reduce command + * mapreduce command * usage: hadoop jar [mainClass] [GENERIC_OPTIONS] args... */ - private static final String MAP_REDUCE_COMMAND = Constants.HADOOP; + private static final String MAPREDUCE_COMMAND = Constants.HADOOP; /** * mapreduce parameters */ - private MapreduceParameters mapreduceParameters; + private MapReduceParameters mapreduceParameters; /** * taskExecutionContext @@ -73,10 +74,10 @@ public class MapReduceTask extends AbstractYarnTask { logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams()); - this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapreduceParameters.class); + this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class); // check parameters - if (!mapreduceParameters.checkParameters()) { + if (mapreduceParameters == null || !mapreduceParameters.checkParameters()) { throw new RuntimeException("mapreduce task params is not valid"); } @@ -103,15 +104,15 @@ public class MapReduceTask extends AbstractYarnTask { /** * build command * @return command - * @throws Exception exception */ @Override - protected String buildCommand() throws Exception { + protected String buildCommand() { // hadoop jar [mainClass] [GENERIC_OPTIONS] args... List args = new ArrayList<>(); - args.add(MAP_REDUCE_COMMAND); + args.add(MAPREDUCE_COMMAND); - args.addAll(buildParameters(mapreduceParameters)); + // other parameters + args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters)); String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); @@ -146,46 +147,4 @@ public class MapReduceTask extends AbstractYarnTask { public AbstractParameters getParameters() { return mapreduceParameters; } - - /** - * build parameters - * @param mapreduceParameters mapreduce parameters - * @return parameter list - */ - private List buildParameters(MapreduceParameters mapreduceParameters) { - List result = new ArrayList<>(); - - // main jar - if (mapreduceParameters.getMainJar() != null) { - result.add(Constants.JAR); - result.add(mapreduceParameters.getMainJar().getRes()); - } - - // main class - if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType()) - && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) { - result.add(mapreduceParameters.getMainClass()); - } - - // others - if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) { - String others = mapreduceParameters.getOthers(); - if (!others.contains(Constants.MR_QUEUE) - && StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { - result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); - } - - result.add(mapreduceParameters.getOthers()); - } else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { - result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); - - } - - // command args - if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) { - result.add(mapreduceParameters.getMainArgs()); - } - return result; - } } - diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 3a27399d2e..f6fec0fe48 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -62,19 +62,19 @@ public class SparkTask extends AbstractYarnTask { /** * taskExecutionContext */ - private final TaskExecutionContext sparkTaskExecutionContext; + private TaskExecutionContext taskExecutionContext; public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) { super(taskExecutionContext, logger); - this.sparkTaskExecutionContext = taskExecutionContext; + this.taskExecutionContext = taskExecutionContext; } @Override public void init() { - logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams()); + logger.info("spark task params {}", taskExecutionContext.getTaskParams()); - sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class); + sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class); if (null == sparkParameters) { logger.error("Spark params is null"); @@ -84,13 +84,12 @@ public class SparkTask extends AbstractYarnTask { if (!sparkParameters.checkParameters()) { throw new RuntimeException("spark task params is not valid"); } - sparkParameters.setQueue(sparkTaskExecutionContext.getQueue()); + sparkParameters.setQueue(taskExecutionContext.getQueue()); setMainJarName(); } /** * create command - * * @return command */ @Override @@ -98,7 +97,7 @@ public class SparkTask extends AbstractYarnTask { // spark-submit [options] [app arguments] List args = new ArrayList<>(); - //spark version + // spark version String sparkCommand = SPARK2_COMMAND; if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { @@ -111,11 +110,11 @@ public class SparkTask extends AbstractYarnTask { args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()), - sparkTaskExecutionContext.getDefinedParams(), + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), sparkParameters.getLocalParametersMap(), - CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()), - sparkTaskExecutionContext.getScheduleTime()); + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); String command = null; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index dd12029765..88437a1102 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -53,10 +53,8 @@ public class FlinkArgsUtilsTest { public String others = "-s hdfs:///flink/savepoint-1537"; public String flinkVersion = "<1.10"; - @Before - public void setUp() throws Exception { - + public void setUp() { ResourceInfo main = new ResourceInfo(); main.setRes("testflink-1.0.0-SNAPSHOT.jar"); mainJar = main; @@ -67,7 +65,6 @@ public class FlinkArgsUtilsTest { */ @Test public void testBuildArgs() { - //Define params FlinkParameters param = new FlinkParameters(); param.setDeployMode(mode); @@ -134,6 +131,6 @@ public class FlinkArgsUtilsTest { param1.setDeployMode(mode); result = FlinkArgsUtils.buildArgs(param1); assertEquals(5, result.size()); - } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java new file mode 100644 index 0000000000..eb68672e07 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; + +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test MapReduceArgsUtils + */ +public class MapReduceArgsUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class); + + public String mainClass = "com.examples.WordCount"; + public ResourceInfo mainJar = null; + public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt"; + public ProgramType programType = ProgramType.JAVA; + public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false"; + public String appName = "mapreduce test"; + public String queue = "queue1"; + + @Before + public void setUp() { + ResourceInfo main = new ResourceInfo(); + main.setRes("testspark-1.0.0-SNAPSHOT.jar"); + mainJar = main; + } + + /** + * Test buildArgs + */ + @Test + public void testBuildArgs() { + //Define params + MapReduceParameters param = new MapReduceParameters(); + param.setMainClass(mainClass); + param.setMainJar(mainJar); + param.setMainArgs(mainArgs); + param.setProgramType(programType); + param.setOthers(others); + param.setAppName(appName); + param.setQueue(queue); + + //Invoke buildArgs + List result = MapReduceArgsUtils.buildArgs(param); + for (String s : result) { + logger.info(s); + } + + //Expected values and order + assertEquals(7, result.size()); + + assertEquals("jar", result.get(0)); + assertEquals(mainJar.getRes(), result.get(1)); + assertEquals(mainClass, result.get(2)); + assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3)); + assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4)); + assertEquals(others, result.get(5)); + assertEquals(mainArgs, result.get(6)); + + //Others param without --queue + param.setOthers("-files xxx/hive-site.xml"); + param.setQueue(null); + result = MapReduceArgsUtils.buildArgs(param); + assertEquals(6, result.size()); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java index f76c2eaa31..7e05cec30b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java @@ -64,7 +64,6 @@ public class SparkArgsUtilsTest { */ @Test public void testBuildArgs() { - //Define params SparkParameters param = new SparkParameters(); param.setDeployMode(mode); @@ -130,4 +129,5 @@ public class SparkArgsUtilsTest { result = SparkArgsUtils.buildArgs(param1); assertEquals(7, result.size()); } + } \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 4581553989..ba50195b39 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -49,6 +49,18 @@ + +
{{$t('App Name')}}
+
+ + +
+
{{$t('Main Arguments')}}
@@ -122,6 +134,8 @@ cacheResourceList: [], // Custom parameter localParams: [], + // MR app name + appName: '', // Main arguments mainArgs: '', // Option parameters @@ -282,6 +296,7 @@ return { id: v } }), localParams: this.localParams, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType @@ -342,6 +357,7 @@ }, resourceList: this.resourceIdArr, localParams: this.localParams, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType @@ -367,6 +383,7 @@ } else { this.mainJar = o.params.mainJar.id || '' } + this.appName = o.params.appName || '' this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'JAVA' diff --git a/pom.xml b/pom.xml index 2829dca7ac..2ab57bcde6 100644 --- a/pom.xml +++ b/pom.xml @@ -936,6 +936,7 @@ **/server/utils/HostTest.java **/server/utils/FlinkArgsUtilsTest.java **/server/utils/LogUtilsTest.java + **/server/utils/MapReduceArgsUtilsTest.java **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java