From 0b69236b04f709dd241e97482a2ac515475827b1 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Fri, 7 Jul 2023 14:24:47 +0800 Subject: [PATCH] [Feature-14251][Task] Support yarn queue definition in yarn task (#14310) --- docs/docs/en/guide/task/flink.md | 1 + docs/docs/en/guide/task/map-reduce.md | 1 + docs/docs/en/guide/task/spark.md | 1 + docs/docs/zh/guide/task/flink.md | 1 + docs/docs/zh/guide/task/map-reduce.md | 1 + docs/docs/zh/guide/task/spark.md | 1 + .../builder/TaskExecutionContextBuilder.java | 1 - .../plugin/task/api/TaskExecutionContext.java | 5 --- .../dataquality/spark/SparkParameters.java | 10 +++--- .../plugin/task/dq/DataQualityTask.java | 4 --- .../plugin/task/dq/utils/SparkArgsUtils.java | 2 +- .../plugin/task/flink/FlinkStreamTask.java | 1 - .../plugin/task/flink/FlinkArgsUtils.java | 26 +++++++-------- .../plugin/task/flink/FlinkConstants.java | 4 +-- .../plugin/task/flink/FlinkParameters.java | 10 +++--- .../plugin/task/flink/FlinkTask.java | 1 - .../plugin/task/mr/MapReduceArgsUtils.java | 10 +++--- .../plugin/task/mr/MapReduceParameters.java | 14 ++++---- .../plugin/task/mr/MapReduceTask.java | 2 -- .../task/mr/MapReduceTaskConstants.java | 2 +- .../plugin/task/spark/SparkConstants.java | 2 +- .../plugin/task/spark/SparkParameters.java | 10 +++--- .../plugin/task/spark/SparkTask.java | 11 +++---- .../src/locales/en_US/project.ts | 2 ++ .../src/locales/zh_CN/project.ts | 2 ++ .../task/components/node/fields/index.ts | 1 + .../task/components/node/fields/use-flink.ts | 3 +- .../task/components/node/fields/use-mr.ts | 3 +- .../task/components/node/fields/use-queue.ts | 32 +++++++++++++++++++ .../task/components/node/fields/use-spark.ts | 4 ++- .../task/components/node/format-data.ts | 4 ++- .../components/node/tasks/use-data-quality.ts | 4 ++- .../components/node/tasks/use-flink-stream.ts | 3 +- .../task/components/node/tasks/use-flink.ts | 3 +- .../task/components/node/tasks/use-mr.ts | 3 +- .../task/components/node/tasks/use-spark.ts | 1 + .../projects/task/components/node/types.ts | 2 ++ 37 files changed, 115 insertions(+), 73 deletions(-) create mode 100644 dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md index b3ec6cd48c..b57606d4d1 100644 --- a/docs/docs/en/guide/task/flink.md +++ b/docs/docs/en/guide/task/flink.md @@ -35,6 +35,7 @@ Flink task type, used to execute Flink programs. For Flink nodes: | TaskManager memory size | Used to set the size of taskManager memories, which can be set according to the actual production environment. | | Number of TaskManager | Used to set the number of taskManagers, which can be set according to the actual production environment. | | Parallelism | Used to set the degree of parallelism for executing Flink tasks. | +| Yarn queue | Used to set the yarn queue, use `default` queue by default. | | Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. | | Optional parameters | Support `--jar`, `--files`,` --archives`, `--conf` format. | | Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. | diff --git a/docs/docs/en/guide/task/map-reduce.md b/docs/docs/en/guide/task/map-reduce.md index 5f9525d44d..9ed71bbab4 100644 --- a/docs/docs/en/guide/task/map-reduce.md +++ b/docs/docs/en/guide/task/map-reduce.md @@ -30,6 +30,7 @@ MapReduce(MR) task type used for executing MapReduce programs. For MapReduce nod | The class of the main function | The **full path** of Main Class, the entry point of the MapReduce program. | | Main jar package | The jar package of the MapReduce program. | | Task name | MapReduce task name. | +| Yarn queue | Used to set the yarn queue, use `default` queue by default. | | Command line parameters | Set the input parameters of the MapReduce program and support the substitution of custom parameter variables. | | Other parameters | Support `-D`, `-files`, `-libjars`, `-archives` format. | | User-defined parameter | It is a local user-defined parameter for MapReduce, and will replace the content with `${variable}` in the script. | diff --git a/docs/docs/en/guide/task/spark.md b/docs/docs/en/guide/task/spark.md index 3d83d967e1..88e6c61943 100644 --- a/docs/docs/en/guide/task/spark.md +++ b/docs/docs/en/guide/task/spark.md @@ -33,6 +33,7 @@ Spark task type for executing Spark application. When executing the Spark task, | Driver memory size | Set the size of Driver memories, which can be set according to the actual production environment. | | Number of Executor | Set the number of Executor, which can be set according to the actual production environment. | | Executor memory size | Set the size of Executor memories, which can be set according to the actual production environment. | +| Yarn queue | Set the yarn queue, use `default` queue by default. | | Main program parameters | Set the input parameters of the Spark program and support the substitution of custom parameter variables. | | Optional parameters | Support `--jars`, `--files`,` --archives`, `--conf` format. | | Resource | Appoint resource files in the `Resource` if parameters refer to them. | diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md index d58bd9d28c..b15078e191 100644 --- a/docs/docs/zh/guide/task/flink.md +++ b/docs/docs/zh/guide/task/flink.md @@ -35,6 +35,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点: | taskManager 内存数 | 用于设置 taskManager 内存数,可根据实际生产环境设置对应的内存数 | | taskManager 数量 | 用于设置 taskManager 的数量,可根据实际生产环境设置对应的数量 | | 并行度 | 用于设置执行 Flink 任务的并行度 | +| Yarn 队列 | 用于设置 Yarn 队列,默认使用 default 队列 | | 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 | | 选项参数 | 支持 `--jar`、`--files`、`--archives`、`--conf` 格式 | | 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 | diff --git a/docs/docs/zh/guide/task/map-reduce.md b/docs/docs/zh/guide/task/map-reduce.md index 84e9816216..5eb9b7cd1d 100644 --- a/docs/docs/zh/guide/task/map-reduce.md +++ b/docs/docs/zh/guide/task/map-reduce.md @@ -24,6 +24,7 @@ MapReduce(MR) 任务类型,用于执行 MapReduce 程序。对于 MapReduce | 主函数的 Class | 是 MapReduce 程序的入口 Main Class 的**全路径** | | 主程序包 | 执行 MapReduce 程序的 jar 包 | | 任务名称(选填) | MapReduce 任务名称 | +| Yarn 队列 | 设置 Yarn 队列,默认使用 default | | 命令行参数 | 是设置 MapReduce 程序的输入参数,支持自定义参数变量的替换 | | 其他参数 | 支持 –D、-files、-libjars、-archives 格式 | | 自定义参数 | 是 MapReduce 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 | diff --git a/docs/docs/zh/guide/task/spark.md b/docs/docs/zh/guide/task/spark.md index b09d3241a5..641b6dcbf9 100644 --- a/docs/docs/zh/guide/task/spark.md +++ b/docs/docs/zh/guide/task/spark.md @@ -32,6 +32,7 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支 - Driver 内存数:用于设置 Driver 内存数,可根据实际生产环境设置对应的内存数。 - Executor 数量:用于设置 Executor 的数量,可根据实际生产环境设置对应的内存数。 - Executor 内存数:用于设置 Executor 内存数,可根据实际生产环境设置对应的内存数。 +- Yarn 队列:用于设置 Yarn 队列,默认使用 default 队列。 - 主程序参数:设置 Spark 程序的输入参数,支持自定义参数变量的替换。 - 选项参数:支持 `--jars`、`--files`、`--archives`、`--conf` 格式。 - 资源:如果其他参数中引用了资源文件,需要在资源中选择指定。 diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java index 6a4e1c628b..9090130eff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java @@ -106,7 +106,6 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setExecutorId(processInstance.getExecutorId()); taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode()); taskExecutionContext.setTenantCode(processInstance.getTenantCode()); - taskExecutionContext.setQueue(processInstance.getQueue()); return this; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 80a84db327..b8d125e5a9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -146,11 +146,6 @@ public class TaskExecutionContext implements Serializable { */ private String tenantCode; - /** - * task queue - */ - private String queue; - /** * process define id */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java index 7b6fc3f0b5..1b1f2cc8ef 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java @@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters { /** * The YARN queue to submit to */ - private String queue; + private String yarnQueue; /** * other arguments @@ -180,12 +180,12 @@ public class SparkParameters extends AbstractParameters { this.appName = appName; } - public String getQueue() { - return queue; + public String getYarnQueue() { + return yarnQueue; } - public void setQueue(String queue) { - this.queue = queue; + public void setYarnQueue(String yarnQueue) { + this.yarnQueue = yarnQueue; } public String getOthers() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 95617b5a88..5fe451d916 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -121,10 +121,6 @@ public class DataQualityTask extends AbstractYarnTask { StringEscapeUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration))) + "\""); - dataQualityParameters - .getSparkParameters() - .setQueue(dqTaskExecutionContext.getQueue()); - setMainJarName(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java index d576abe6d0..699fc98d76 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java @@ -106,7 +106,7 @@ public class SparkArgsUtils { String others = param.getOthers(); if (!SPARK_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { - String queue = param.getQueue(); + String queue = param.getYarnQueue(); if (StringUtils.isNotEmpty(queue)) { args.add(SparkConstants.SPARK_QUEUE); args.add(queue); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index 4fc391c839..bc53696077 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -56,7 +56,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { if (flinkParameters == null || !flinkParameters.checkParameters()) { throw new RuntimeException("flink task params is not valid"); } - flinkParameters.setQueue(taskExecutionContext.getQueue()); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index a96cddfd08..6937db6b78 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -163,9 +163,9 @@ public class FlinkArgsUtils { } // yarn.application.queue - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); + String yarnQueue = flinkParameters.getYarnQueue(); + if (StringUtils.isNotEmpty(yarnQueue)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, yarnQueue)); } } @@ -306,26 +306,26 @@ public class FlinkArgsUtils { case CLUSTER: if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); } else { - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); } case APPLICATION: - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); } } private static void doAddQueue(List args, FlinkParameters flinkParameters, String option) { String others = flinkParameters.getOthers(); if (StringUtils.isEmpty(others) || !others.contains(option)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { + String yarnQueue = flinkParameters.getYarnQueue(); + if (StringUtils.isNotEmpty(yarnQueue)) { switch (option) { - case FlinkConstants.FLINK_QUEUE_FOR_TARGETS: - args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue)); - case FlinkConstants.FLINK_QUEUE_FOR_MODE: - args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE); - args.add(queue); + case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: + args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); + case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: + args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); + args.add(yarnQueue); } } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 6e8d51b2ce..b2d7607761 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -48,8 +48,8 @@ public class FlinkConstants { public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; - public static final String FLINK_QUEUE_FOR_MODE = "-yqu"; - public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; + public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu"; + public static final String FLINK_YARN_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java index 8b45641989..61ad5d4772 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java @@ -86,7 +86,7 @@ public class FlinkParameters extends AbstractParameters { /** * The YARN queue to submit to */ - private String queue; + private String yarnQueue; /** * other arguments @@ -194,12 +194,12 @@ public class FlinkParameters extends AbstractParameters { this.taskManagerMemory = taskManagerMemory; } - public String getQueue() { - return queue; + public String getYarnQueue() { + return yarnQueue; } - public void setQueue(String queue) { - this.queue = queue; + public void setYarnQueue(String yarnQueue) { + this.yarnQueue = yarnQueue; } public List getResourceList() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index abf0da2c5c..33afd7fc7f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -59,7 +59,6 @@ public class FlinkTask extends AbstractYarnTask { if (flinkParameters == null || !flinkParameters.checkParameters()) { throw new RuntimeException("flink task params is not valid"); } - flinkParameters.setQueue(taskExecutionContext.getQueue()); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java index 070774adef..97e3bcc661 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.mr; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.D; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR; import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME; -import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE; +import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_YARN_QUEUE; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; @@ -67,10 +67,10 @@ public class MapReduceArgsUtils { } String others = param.getOthers(); - if (StringUtils.isEmpty(others) || !others.contains(MR_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - args.add(String.format("%s%s=%s", D, MR_QUEUE, queue)); + if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) { + String yarnQueue = param.getYarnQueue(); + if (StringUtils.isNotEmpty(yarnQueue)) { + args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java index 8a8296fa26..8d63b6c9fb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java @@ -54,9 +54,9 @@ public class MapReduceParameters extends AbstractParameters { private String appName; /** - * queue + * The YARN queue to submit to */ - private String queue; + private String yarnQueue; /** * resource list @@ -101,12 +101,12 @@ public class MapReduceParameters extends AbstractParameters { this.appName = appName; } - public String getQueue() { - return queue; + public String getYarnQueue() { + return yarnQueue; } - public void setQueue(String queue) { - this.queue = queue; + public void setYarnQueue(String yarnQueue) { + this.yarnQueue = yarnQueue; } public List getResourceList() { @@ -152,7 +152,7 @@ public class MapReduceParameters extends AbstractParameters { return "mainJar= " + mainJar + "mainClass=" + mainClass + "mainArgs=" + mainArgs - + "queue=" + queue + + "yarnQueue=" + yarnQueue + "other mainArgs=" + others; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index e172488103..2ae5c6e707 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -70,8 +70,6 @@ public class MapReduceTask extends AbstractYarnTask { throw new RuntimeException("mapreduce task params is not valid"); } - mapreduceParameters.setQueue(taskExecutionContext.getQueue()); - // replace placeholder,and combine local and global parameters Map paramsMap = taskExecutionContext.getPrepareParamsMap(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java index 0c438566ac..ca896aafa9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java @@ -31,6 +31,6 @@ public class MapReduceTaskConstants { /** * -D mapreduce.job.queuename=queuename */ - public static final String MR_QUEUE = "mapreduce.job.queuename"; + public static final String MR_YARN_QUEUE = "mapreduce.job.queuename"; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java index 7ad2e27b07..d4230c9d8c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java @@ -36,7 +36,7 @@ public class SparkConstants { /** * --queue QUEUE */ - public static final String SPARK_QUEUE = "--queue"; + public static final String SPARK_YARN_QUEUE = "--queue"; public static final String DEPLOY_MODE = "--deploy-mode"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java index aa7cd27c85..0bb40e7324 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java @@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters { /** * The YARN queue to submit to */ - private String queue; + private String yarnQueue; /** * other arguments @@ -198,12 +198,12 @@ public class SparkParameters extends AbstractParameters { this.appName = appName; } - public String getQueue() { - return queue; + public String getYarnQueue() { + return yarnQueue; } - public void setQueue(String queue) { - this.queue = queue; + public void setYarnQueue(String yarnQueue) { + this.yarnQueue = yarnQueue; } public String getOthers() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index afe2c1d223..0a27d9011e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -81,7 +81,6 @@ public class SparkTask extends AbstractYarnTask { if (!sparkParameters.checkParameters()) { throw new RuntimeException("spark task params is not valid"); } - sparkParameters.setQueue(taskExecutionContext.getQueue()); log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters)); } @@ -165,11 +164,11 @@ public class SparkTask extends AbstractYarnTask { String others = sparkParameters.getOthers(); if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) - && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { - String queue = sparkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - args.add(SparkConstants.SPARK_QUEUE); - args.add(queue); + && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) { + String yarnQueue = sparkParameters.getYarnQueue(); + if (StringUtils.isNotEmpty(yarnQueue)) { + args.add(SparkConstants.SPARK_YARN_QUEUE); + args.add(yarnQueue); } } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 091af80c99..cff8a39789 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -854,6 +854,8 @@ export default { dynamic_separator_tips: 'separator(required)', child_node_definition: 'child node definition', child_node_instance: 'child node instance', + yarn_queue: 'Yarn Queue', + yarn_queue_tips: 'Please input yarn queue(optional)', }, menu: { fav: 'Favorites', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index c62f792bfe..edc3d98943 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -830,6 +830,8 @@ export default { dynamic_separator_tips: '分隔符(必填)', child_node_definition: '子节点定义', child_node_instance: '子节点实例', + yarn_queue: 'Yarn队列', + yarn_queue_tips: '请输入Yarn队列(选填)', }, menu: { fav: '收藏组件', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index f56e6967f1..57d168867e 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -90,3 +90,4 @@ export { useLinkis } from './use-linkis' export { useDataFactory } from './use-data-factory' export { useRemoteShell } from './use-remote-shell' export { useDynamic } from './use-dynamic' +export { useYarnQueue } from './use-queue' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts index 5c66dd15f9..aaedf5d2aa 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts @@ -16,7 +16,7 @@ */ import { computed, watch, watchEffect } from 'vue' import { useI18n } from 'vue-i18n' -import { useCustomParams, useMainJar, useResources } from '.' +import { useCustomParams, useMainJar, useResources, useYarnQueue } from '.' import type { IJsonItem } from '../types' export function useFlink(model: { [field: string]: any }): IJsonItem[] { @@ -282,6 +282,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { }, value: model.parallelism }, + useYarnQueue(), { type: 'input', field: 'mainArgs', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts index 83c9996529..9850db57c2 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts @@ -16,7 +16,7 @@ */ import { computed } from 'vue' import { useI18n } from 'vue-i18n' -import { useCustomParams, useMainJar, useResources } from '.' +import { useCustomParams, useMainJar, useResources, useYarnQueue } from '.' import type { IJsonItem } from '../types' export function useMr(model: { [field: string]: any }): IJsonItem[] { @@ -68,6 +68,7 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] { placeholder: t('project.node.app_name_tips') } }, + useYarnQueue(), { type: 'input', field: 'mainArgs', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts new file mode 100644 index 0000000000..c4ca37375b --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import type { IJsonItem } from '../types' + +export function useYarnQueue(): IJsonItem { + const { t } = useI18n() + + return { + type: 'input', + field: 'yarnQueue', + name: t('project.node.yarn_queue'), + span: 12, + props: { + placeholder: t('project.node.yarn_queue_tips') + }, + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts index 7c68a25920..72e442f9fc 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts @@ -26,7 +26,8 @@ import { useExecutorCores, useMainJar, useNamespace, - useResources + useResources, + useYarnQueue } from '.' import type { IJsonItem } from '../types' @@ -105,6 +106,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { useExecutorNumber(), useExecutorMemory(), useExecutorCores(), + useYarnQueue(), { type: 'input', field: 'mainArgs', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 914c1edb12..e6d87298d0 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -62,6 +62,7 @@ export function formatParams(data: INodeData): { if (data.namespace) { taskParams.namespace = data.namespace } + taskParams.yarnQueue = data.yarnQueue } if (data.taskType === 'SPARK') { @@ -321,7 +322,8 @@ export function formatParams(data: INodeData): { executorCores: data.executorCores, executorMemory: data.executorMemory, numExecutors: data.numExecutors, - others: data.others + others: data.others, + yarnQueue: data.yarnQueue } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts index 0753e60d84..457fc3afa6 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts @@ -57,7 +57,8 @@ export function useDataQuality({ numExecutors: 2, executorMemory: '2G', executorCores: 2, - others: '--conf spark.yarn.maxAppAttempts=1' + others: '--conf spark.yarn.maxAppAttempts=1', + yarnQueue: '' } as INodeData) return { @@ -84,6 +85,7 @@ export function useDataQuality({ Fields.useExecutorNumber(), Fields.useExecutorMemory(), Fields.useExecutorCores(), + Fields.useYarnQueue(), { type: 'input', field: 'others', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts index 315514f238..d2186f4466 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts @@ -53,7 +53,8 @@ export function useFlinkStream({ slot: 1, taskManager: 2, parallelism: 1, - timeoutNotifyStrategy: ['WARN'] + timeoutNotifyStrategy: ['WARN'], + yarnQueue: '' }) return { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts index 21a81046cc..adb7dd39f7 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts @@ -53,7 +53,8 @@ export function useFlink({ slot: 1, taskManager: 2, parallelism: 1, - timeoutNotifyStrategy: ['WARN'] + timeoutNotifyStrategy: ['WARN'], + yarnQueue: '' }) return { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts index c0b5b79b09..13eec1c0d6 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts @@ -44,7 +44,8 @@ export function useMr({ delayTime: 0, timeout: 30, programType: 'SCALA', - timeoutNotifyStrategy: ['WARN'] + timeoutNotifyStrategy: ['WARN'], + yarnQueue: '', } as INodeData) return { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts index 9d74dcffb3..0fc42445c4 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts @@ -51,6 +51,7 @@ export function useSpark({ numExecutors: 2, executorMemory: '2G', executorCores: 2, + yarnQueue: '', timeoutNotifyStrategy: ['WARN'] } as INodeData) diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index e67eecb4f0..db19911e32 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -223,6 +223,7 @@ interface ISparkParameters { executorMemory?: string numExecutors?: number others?: string + yarnQueue?: string } interface IRuleParameters { @@ -441,6 +442,7 @@ interface ITaskParams { degreeOfParallelism?: number filterCondition?: string listParameters?: Array + yarnQueue?: string } interface INodeData