From 2c6b40cbd8e9af999fc1e2805a795d829982f39b Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 11 Mar 2021 12:53:46 +0800 Subject: [PATCH] [1.3.6-prepare][Feature-5028][MR] Support mapreduce name #5029 (#5030) --- .../api/utils/CheckUtilsTest.java | 21 +- .../dolphinscheduler/common/Constants.java | 12 +- .../common/task/flink/FlinkParameters.java | 402 +++++++++--------- ...rameters.java => MapReduceParameters.java} | 20 +- .../common/utils/TaskParametersUtils.java | 4 +- .../server/utils/FlinkArgsUtils.java | 6 + .../server/utils/MapReduceArgsUtils.java | 85 ++++ .../server/utils/SparkArgsUtils.java | 8 +- .../server/worker/task/AbstractTask.java | 12 +- .../server/worker/task/flink/FlinkTask.java | 189 ++++---- .../server/worker/task/mr/MapReduceTask.java | 65 +-- .../server/worker/task/spark/SparkTask.java | 21 +- .../server/utils/FlinkArgsUtilsTest.java | 7 +- .../server/utils/MapReduceArgsUtilsTest.java | 95 +++++ .../server/utils/SparkArgsUtilsTest.java | 4 +- .../pages/dag/_source/formModel/tasks/mr.vue | 17 + pom.xml | 1 + 17 files changed, 575 insertions(+), 394 deletions(-) rename dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/{MapreduceParameters.java => MapReduceParameters.java} (91%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index ccc231fcf6..36d5753787 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -17,18 +17,20 @@ package org.apache.dolphinscheduler.api.utils; -import org.apache.commons.lang.StringUtils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -36,18 +38,15 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; + +import java.util.Map; + import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Map; - -import static org.junit.Assert.*; - public class CheckUtilsTest { private static final Logger logger = LoggerFactory.getLogger(CheckUtilsTest.class); @@ -171,8 +170,8 @@ public class CheckUtilsTest { sqlParameters.setSql("yy"); assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString())); - // MapreduceParameters - MapreduceParameters mapreduceParameters = new MapreduceParameters(); + // MapReduceParameters + MapReduceParameters mapreduceParameters = new MapReduceParameters(); assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString())); ResourceInfo resourceInfoMapreduce = new ResourceInfo(); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 0f0a181996..b02c8dd80c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -533,9 +533,6 @@ public final class Constants { public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; - /** - * hadoop params constant - */ /** * jar */ @@ -547,12 +544,17 @@ public final class Constants { public static final String HADOOP = "hadoop"; /** - * -D parameter + * -D = */ public static final String D = "-D"; /** - * -D mapreduce.job.queuename=ququename + * -D mapreduce.job.name=name + */ + public static final String MR_NAME = "mapreduce.job.name"; + + /** + * -D mapreduce.job.queuename=queuename */ public static final String MR_QUEUE = "mapreduce.job.queuename"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java index 82613f2963..76d68c0460 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java @@ -29,213 +29,213 @@ import java.util.List; */ public class FlinkParameters extends AbstractParameters { - /** - * major jar - */ - private ResourceInfo mainJar; - - /** - * major class - */ - private String mainClass; - - /** - * deploy mode yarn-cluster yarn-local - */ - private String deployMode; - - /** - * arguments - */ - private String mainArgs; - - /** - * slot count - */ - private int slot; - - /** - * parallelism - */ - private int parallelism; - - /** - * yarn application name - */ - private String appName; - - /** - * taskManager count - */ - private int taskManager; - - /** - * job manager memory - */ - private String jobManagerMemory; - - /** - * task manager memory - */ - private String taskManagerMemory; - - /** - * resource list - */ - private List resourceList = new ArrayList<>(); - - /** - * The YARN queue to submit to - */ - private String queue; - - /** - * other arguments - */ - private String others; - - /** - * flink version - */ - private String flinkVersion; - - /** - * program type - * 0 JAVA,1 SCALA,2 PYTHON - */ - private ProgramType programType; - - public ResourceInfo getMainJar() { - return mainJar; - } - - public void setMainJar(ResourceInfo mainJar) { - this.mainJar = mainJar; - } - - public String getMainClass() { - return mainClass; - } - - public void setMainClass(String mainClass) { - this.mainClass = mainClass; - } - - public String getDeployMode() { - return deployMode; - } - - public void setDeployMode(String deployMode) { - this.deployMode = deployMode; - } - - public String getMainArgs() { - return mainArgs; - } - - public void setMainArgs(String mainArgs) { - this.mainArgs = mainArgs; - } - - public int getSlot() { - return slot; - } - - public void setSlot(int slot) { - this.slot = slot; - } - - public int getParallelism() { - return parallelism; - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public String getAppName() { - return appName; - } - - public void setAppName(String appName) { - this.appName = appName; - } - - public int getTaskManager() { - return taskManager; - } - - public void setTaskManager(int taskManager) { - this.taskManager = taskManager; - } - - public String getJobManagerMemory() { - return jobManagerMemory; - } - - public void setJobManagerMemory(String jobManagerMemory) { - this.jobManagerMemory = jobManagerMemory; - } - - public String getTaskManagerMemory() { - return taskManagerMemory; - } - - public void setTaskManagerMemory(String taskManagerMemory) { - this.taskManagerMemory = taskManagerMemory; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public List getResourceList() { - return resourceList; - } - - public void setResourceList(List resourceList) { - this.resourceList = resourceList; - } - - public String getOthers() { - return others; - } + /** + * major jar + */ + private ResourceInfo mainJar; + + /** + * major class + */ + private String mainClass; + + /** + * deploy mode yarn-cluster yarn-local + */ + private String deployMode; + + /** + * arguments + */ + private String mainArgs; + + /** + * slot count + */ + private int slot; + + /** + * parallelism + */ + private int parallelism; + + /** + * yarn application name + */ + private String appName; + + /** + * taskManager count + */ + private int taskManager; + + /** + * job manager memory + */ + private String jobManagerMemory; + + /** + * task manager memory + */ + private String taskManagerMemory; + + /** + * resource list + */ + private List resourceList = new ArrayList<>(); + + /** + * The YARN queue to submit to + */ + private String queue; + + /** + * other arguments + */ + private String others; + + /** + * flink version + */ + private String flinkVersion; + + /** + * program type + * 0 JAVA,1 SCALA,2 PYTHON + */ + private ProgramType programType; + + public ResourceInfo getMainJar() { + return mainJar; + } + + public void setMainJar(ResourceInfo mainJar) { + this.mainJar = mainJar; + } + + public String getMainClass() { + return mainClass; + } + + public void setMainClass(String mainClass) { + this.mainClass = mainClass; + } + + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + + public String getMainArgs() { + return mainArgs; + } + + public void setMainArgs(String mainArgs) { + this.mainArgs = mainArgs; + } + + public int getSlot() { + return slot; + } + + public void setSlot(int slot) { + this.slot = slot; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public int getTaskManager() { + return taskManager; + } + + public void setTaskManager(int taskManager) { + this.taskManager = taskManager; + } + + public String getJobManagerMemory() { + return jobManagerMemory; + } + + public void setJobManagerMemory(String jobManagerMemory) { + this.jobManagerMemory = jobManagerMemory; + } + + public String getTaskManagerMemory() { + return taskManagerMemory; + } + + public void setTaskManagerMemory(String taskManagerMemory) { + this.taskManagerMemory = taskManagerMemory; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public List getResourceList() { + return resourceList; + } + + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + + public String getOthers() { + return others; + } - public void setOthers(String others) { - this.others = others; - } + public void setOthers(String others) { + this.others = others; + } - public ProgramType getProgramType() { - return programType; - } + public ProgramType getProgramType() { + return programType; + } - public void setProgramType(ProgramType programType) { - this.programType = programType; - } + public void setProgramType(ProgramType programType) { + this.programType = programType; + } - public String getFlinkVersion() { - return flinkVersion; - } + public String getFlinkVersion() { + return flinkVersion; + } - public void setFlinkVersion(String flinkVersion) { - this.flinkVersion = flinkVersion; - } + public void setFlinkVersion(String flinkVersion) { + this.flinkVersion = flinkVersion; + } + + @Override + public boolean checkParameters() { + return mainJar != null && programType != null; + } - @Override - public boolean checkParameters() { - return mainJar != null && programType != null; - } - - @Override - public List getResourceFilesList() { - if (mainJar != null && !resourceList.contains(mainJar)) { - resourceList.add(mainJar); + @Override + public List getResourceFilesList() { + if (mainJar != null && !resourceList.contains(mainJar)) { + resourceList.add(mainJar); + } + return resourceList; } - return resourceList; - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java similarity index 91% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java index 5126e82e85..762ee4275a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapReduceParameters.java @@ -14,17 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.mr; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import java.util.ArrayList; import java.util.List; -public class MapreduceParameters extends AbstractParameters { +/** + * mapreduce parameters + */ +public class MapReduceParameters extends AbstractParameters { /** * major jar @@ -46,6 +49,11 @@ public class MapreduceParameters extends AbstractParameters { */ private String others; + /** + * app name + */ + private String appName; + /** * queue */ @@ -87,6 +95,14 @@ public class MapreduceParameters extends AbstractParameters { this.others = others; } + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + public String getQueue() { return queue; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index a3492f49fa..021748e3a0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -60,7 +60,7 @@ public class TaskParametersUtils { case SQL: return JSONUtils.parseObject(parameter, SqlParameters.class); case MR: - return JSONUtils.parseObject(parameter, MapreduceParameters.class); + return JSONUtils.parseObject(parameter, MapReduceParameters.class); case SPARK: return JSONUtils.parseObject(parameter, SparkParameters.class); case PYTHON: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index 8b3b691abc..dbd92e020f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -30,9 +30,15 @@ import java.util.List; * flink args utils */ public class FlinkArgsUtils { + private static final String LOCAL_DEPLOY_MODE = "local"; + private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + private FlinkArgsUtils() { + throw new IllegalStateException("Utility class"); + } + /** * build args * @param param flink parameters diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java new file mode 100644 index 0000000000..31e182b650 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * mapreduce args utils + */ +public class MapReduceArgsUtils { + + private MapReduceArgsUtils() { + throw new IllegalStateException("Utility class"); + } + + /** + * build args + * + * @param param param + * @return argument list + */ + public static List buildArgs(MapReduceParameters param) { + List args = new ArrayList<>(); + + ResourceInfo mainJar = param.getMainJar(); + if (mainJar != null) { + args.add(Constants.JAR); + args.add(mainJar.getRes()); + } + + ProgramType programType = param.getProgramType(); + String mainClass = param.getMainClass(); + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + args.add(mainClass); + } + + String appName = param.getAppName(); + if (StringUtils.isNotEmpty(appName)) { + args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName))); + } + + String others = param.getOthers(); + if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) { + String queue = param.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue)); + } + } + + // -conf -archives -files -libjars -D + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + + String mainArgs = param.getMainArgs(); + if (StringUtils.isNotEmpty(mainArgs)) { + args.add(mainArgs); + } + + return args; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java index 76828f346d..1a9aaa692b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java @@ -37,6 +37,10 @@ public class SparkArgsUtils { private static final String SPARK_ON_YARN = "yarn"; + private SparkArgsUtils() { + throw new IllegalStateException("Utility class"); + } + /** * build args * @@ -54,9 +58,9 @@ public class SparkArgsUtils { } args.add(deployMode); - ProgramType type = param.getProgramType(); + ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); - if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { args.add(Constants.MAIN_CLASS); args.add(mainClass); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index a892b21c33..84c5052f29 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -16,7 +16,8 @@ */ package org.apache.dolphinscheduler.server.worker.task; -import org.apache.commons.lang.StringUtils; +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -27,7 +28,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -38,12 +39,13 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.slf4j.Logger; + +import org.apache.commons.lang.StringUtils; import java.util.List; import java.util.Map; -import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; +import org.slf4j.Logger; /** * executive task @@ -222,7 +224,7 @@ public abstract class AbstractTask { paramsClass = ProcedureParameters.class; break; case MR: - paramsClass = MapreduceParameters.class; + paramsClass = MapReduceParameters.class; break; case SPARK: paramsClass = SparkParameters.class; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 9de28e3e27..4d34190052 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.flink; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -21,129 +22,127 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.slf4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.slf4j.Logger; + /** * flink task */ public class FlinkTask extends AbstractYarnTask { - /** - * flink command - * usage: flink run [OPTIONS] - */ - private static final String FLINK_COMMAND = "flink"; - private static final String FLINK_RUN = "run"; - - /** - * flink parameters - */ - private FlinkParameters flinkParameters; - - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; - - public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - } + /** + * flink command + * usage: flink run [OPTIONS] + */ + private static final String FLINK_COMMAND = "flink"; + private static final String FLINK_RUN = "run"; + + /** + * flink parameters + */ + private FlinkParameters flinkParameters; + + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + + public FlinkTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.taskExecutionContext = taskExecutionContext; + } - @Override - public void init() { + @Override + public void init() { - logger.info("flink task params {}", taskExecutionContext.getTaskParams()); + logger.info("flink task params {}", taskExecutionContext.getTaskParams()); - flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); + flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); - if (!flinkParameters.checkParameters()) { - throw new RuntimeException("flink task params is not valid"); + if (flinkParameters == null || !flinkParameters.checkParameters()) { + throw new RuntimeException("flink task params is not valid"); + } + flinkParameters.setQueue(taskExecutionContext.getQueue()); + setMainJarName(); + + if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { + String args = flinkParameters.getMainArgs(); + + // replace placeholder + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + flinkParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + + logger.info("param Map : {}", paramsMap); + if (paramsMap != null) { + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + logger.info("param args : {}", args); + } + flinkParameters.setMainArgs(args); + } } - flinkParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); + /** + * create command + * @return command + */ + @Override + protected String buildCommand() { + // flink run [OPTIONS] + List args = new ArrayList<>(); - if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { - String args = flinkParameters.getMainArgs(); + args.add(FLINK_COMMAND); + args.add(FLINK_RUN); + logger.info("flink task args : {}", args); + // other parameters + args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); + String command = ParameterUtils + .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - flinkParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + logger.info("flink task command : {}", command); - logger.info("param Map : {}", paramsMap); - if (paramsMap != null ){ - - args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); - logger.info("param args : {}", args); - } - flinkParameters.setMainArgs(args); + return command; } - } - - /** - * create command - * @return command - */ - @Override - protected String buildCommand() { - // flink run [OPTIONS] - List args = new ArrayList<>(); - - args.add(FLINK_COMMAND); - args.add(FLINK_RUN); - logger.info("flink task args : {}", args); - // other parameters - args.addAll(FlinkArgsUtils.buildArgs(flinkParameters)); - - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - - logger.info("flink task command : {}", command); - - return command; - } - - @Override - protected void setMainJarName() { - // main jar - ResourceInfo mainJar = flinkParameters.getMainJar(); - if (mainJar != null) { - int resourceId = mainJar.getId(); - String resourceName; - if (resourceId == 0) { - resourceName = mainJar.getRes(); - } else { - Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId()); - if (resource == null) { - logger.error("resource id: {} not exist", resourceId); - throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = flinkParameters.getMainJar(); + if (mainJar != null) { + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + flinkParameters.setMainJar(mainJar); } - resourceName = resource.getFullName().replaceFirst("/", ""); - } - mainJar.setRes(resourceName); - flinkParameters.setMainJar(mainJar); } - } - @Override - public AbstractParameters getParameters() { - return flinkParameters; - } + @Override + public AbstractParameters getParameters() { + return flinkParameters; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 43a7079dcf..f60b1cb426 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.mr; import org.apache.dolphinscheduler.common.Constants; @@ -22,12 +23,12 @@ import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; @@ -43,15 +44,15 @@ import org.slf4j.Logger; public class MapReduceTask extends AbstractYarnTask { /** - * map reduce command + * mapreduce command * usage: hadoop jar [mainClass] [GENERIC_OPTIONS] args... */ - private static final String MAP_REDUCE_COMMAND = Constants.HADOOP; + private static final String MAPREDUCE_COMMAND = Constants.HADOOP; /** * mapreduce parameters */ - private MapreduceParameters mapreduceParameters; + private MapReduceParameters mapreduceParameters; /** * taskExecutionContext @@ -73,10 +74,10 @@ public class MapReduceTask extends AbstractYarnTask { logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams()); - this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapreduceParameters.class); + this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class); // check parameters - if (!mapreduceParameters.checkParameters()) { + if (mapreduceParameters == null || !mapreduceParameters.checkParameters()) { throw new RuntimeException("mapreduce task params is not valid"); } @@ -103,15 +104,15 @@ public class MapReduceTask extends AbstractYarnTask { /** * build command * @return command - * @throws Exception exception */ @Override - protected String buildCommand() throws Exception { + protected String buildCommand() { // hadoop jar [mainClass] [GENERIC_OPTIONS] args... List args = new ArrayList<>(); - args.add(MAP_REDUCE_COMMAND); + args.add(MAPREDUCE_COMMAND); - args.addAll(buildParameters(mapreduceParameters)); + // other parameters + args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters)); String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); @@ -146,46 +147,4 @@ public class MapReduceTask extends AbstractYarnTask { public AbstractParameters getParameters() { return mapreduceParameters; } - - /** - * build parameters - * @param mapreduceParameters mapreduce parameters - * @return parameter list - */ - private List buildParameters(MapreduceParameters mapreduceParameters) { - List result = new ArrayList<>(); - - // main jar - if (mapreduceParameters.getMainJar() != null) { - result.add(Constants.JAR); - result.add(mapreduceParameters.getMainJar().getRes()); - } - - // main class - if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType()) - && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) { - result.add(mapreduceParameters.getMainClass()); - } - - // others - if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) { - String others = mapreduceParameters.getOthers(); - if (!others.contains(Constants.MR_QUEUE) - && StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { - result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); - } - - result.add(mapreduceParameters.getOthers()); - } else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { - result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); - - } - - // command args - if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) { - result.add(mapreduceParameters.getMainArgs()); - } - return result; - } } - diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 3a27399d2e..f6fec0fe48 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -62,19 +62,19 @@ public class SparkTask extends AbstractYarnTask { /** * taskExecutionContext */ - private final TaskExecutionContext sparkTaskExecutionContext; + private TaskExecutionContext taskExecutionContext; public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) { super(taskExecutionContext, logger); - this.sparkTaskExecutionContext = taskExecutionContext; + this.taskExecutionContext = taskExecutionContext; } @Override public void init() { - logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams()); + logger.info("spark task params {}", taskExecutionContext.getTaskParams()); - sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class); + sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class); if (null == sparkParameters) { logger.error("Spark params is null"); @@ -84,13 +84,12 @@ public class SparkTask extends AbstractYarnTask { if (!sparkParameters.checkParameters()) { throw new RuntimeException("spark task params is not valid"); } - sparkParameters.setQueue(sparkTaskExecutionContext.getQueue()); + sparkParameters.setQueue(taskExecutionContext.getQueue()); setMainJarName(); } /** * create command - * * @return command */ @Override @@ -98,7 +97,7 @@ public class SparkTask extends AbstractYarnTask { // spark-submit [options] [app arguments] List args = new ArrayList<>(); - //spark version + // spark version String sparkCommand = SPARK2_COMMAND; if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { @@ -111,11 +110,11 @@ public class SparkTask extends AbstractYarnTask { args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()), - sparkTaskExecutionContext.getDefinedParams(), + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), sparkParameters.getLocalParametersMap(), - CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()), - sparkTaskExecutionContext.getScheduleTime()); + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); String command = null; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java index dd12029765..88437a1102 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java @@ -53,10 +53,8 @@ public class FlinkArgsUtilsTest { public String others = "-s hdfs:///flink/savepoint-1537"; public String flinkVersion = "<1.10"; - @Before - public void setUp() throws Exception { - + public void setUp() { ResourceInfo main = new ResourceInfo(); main.setRes("testflink-1.0.0-SNAPSHOT.jar"); mainJar = main; @@ -67,7 +65,6 @@ public class FlinkArgsUtilsTest { */ @Test public void testBuildArgs() { - //Define params FlinkParameters param = new FlinkParameters(); param.setDeployMode(mode); @@ -134,6 +131,6 @@ public class FlinkArgsUtilsTest { param1.setDeployMode(mode); result = FlinkArgsUtils.buildArgs(param1); assertEquals(5, result.size()); - } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java new file mode 100644 index 0000000000..eb68672e07 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProgramType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; + +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test MapReduceArgsUtils + */ +public class MapReduceArgsUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class); + + public String mainClass = "com.examples.WordCount"; + public ResourceInfo mainJar = null; + public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt"; + public ProgramType programType = ProgramType.JAVA; + public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false"; + public String appName = "mapreduce test"; + public String queue = "queue1"; + + @Before + public void setUp() { + ResourceInfo main = new ResourceInfo(); + main.setRes("testspark-1.0.0-SNAPSHOT.jar"); + mainJar = main; + } + + /** + * Test buildArgs + */ + @Test + public void testBuildArgs() { + //Define params + MapReduceParameters param = new MapReduceParameters(); + param.setMainClass(mainClass); + param.setMainJar(mainJar); + param.setMainArgs(mainArgs); + param.setProgramType(programType); + param.setOthers(others); + param.setAppName(appName); + param.setQueue(queue); + + //Invoke buildArgs + List result = MapReduceArgsUtils.buildArgs(param); + for (String s : result) { + logger.info(s); + } + + //Expected values and order + assertEquals(7, result.size()); + + assertEquals("jar", result.get(0)); + assertEquals(mainJar.getRes(), result.get(1)); + assertEquals(mainClass, result.get(2)); + assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3)); + assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4)); + assertEquals(others, result.get(5)); + assertEquals(mainArgs, result.get(6)); + + //Others param without --queue + param.setOthers("-files xxx/hive-site.xml"); + param.setQueue(null); + result = MapReduceArgsUtils.buildArgs(param); + assertEquals(6, result.size()); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java index f76c2eaa31..23a277c075 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java @@ -64,7 +64,6 @@ public class SparkArgsUtilsTest { */ @Test public void testBuildArgs() { - //Define params SparkParameters param = new SparkParameters(); param.setDeployMode(mode); @@ -130,4 +129,5 @@ public class SparkArgsUtilsTest { result = SparkArgsUtils.buildArgs(param1); assertEquals(7, result.size()); } -} \ No newline at end of file + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index d21611e960..b0832b7210 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -49,6 +49,18 @@ + +
{{$t('App Name')}}
+
+ + +
+
{{$t('Main Arguments')}}
@@ -125,6 +137,8 @@ cacheResourceList: [], // Custom parameter localParams: [], + // MR app name + appName: '', // Main arguments mainArgs: '', // Option parameters @@ -293,6 +307,7 @@ return {id: v} }), localParams: this.localParams, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType @@ -348,6 +363,7 @@ }, resourceList: resourceIdArr, localParams: this.localParams, + appName: this.appName, mainArgs: this.mainArgs, others: this.others, programType: this.programType @@ -384,6 +400,7 @@ } else { this.mainJar = o.params.mainJar.id || '' } + this.appName = o.params.appName || '' this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'JAVA' diff --git a/pom.xml b/pom.xml index 31968fb96d..aff1f98034 100644 --- a/pom.xml +++ b/pom.xml @@ -820,6 +820,7 @@ **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/MapReduceArgsUtilsTest.java **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java