Browse Source

[Feature-14251][Task] Support yarn queue definition in yarn task (#14310)

3.2.1-prepare
Aaron Wang 1 year ago committed by GitHub
parent
commit
0b69236b04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/docs/en/guide/task/flink.md
  2. 1
      docs/docs/en/guide/task/map-reduce.md
  3. 1
      docs/docs/en/guide/task/spark.md
  4. 1
      docs/docs/zh/guide/task/flink.md
  5. 1
      docs/docs/zh/guide/task/map-reduce.md
  6. 1
      docs/docs/zh/guide/task/spark.md
  7. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
  8. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  9. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java
  10. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  11. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
  12. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  13. 26
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  14. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  15. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
  16. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  17. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
  18. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java
  19. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  20. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java
  21. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
  22. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
  23. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  24. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  25. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  26. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  27. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
  28. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts
  29. 32
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts
  30. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
  31. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  32. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts
  33. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts
  34. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
  35. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts
  36. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
  37. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

1
docs/docs/en/guide/task/flink.md

@ -35,6 +35,7 @@ Flink task type, used to execute Flink programs. For Flink nodes:
| TaskManager memory size | Used to set the size of taskManager memories, which can be set according to the actual production environment. | | TaskManager memory size | Used to set the size of taskManager memories, which can be set according to the actual production environment. |
| Number of TaskManager | Used to set the number of taskManagers, which can be set according to the actual production environment. | | Number of TaskManager | Used to set the number of taskManagers, which can be set according to the actual production environment. |
| Parallelism | Used to set the degree of parallelism for executing Flink tasks. | | Parallelism | Used to set the degree of parallelism for executing Flink tasks. |
| Yarn queue | Used to set the yarn queue, use `default` queue by default. |
| Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. | | Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. |
| Optional parameters | Support `--jar`, `--files`,` --archives`, `--conf` format. | | Optional parameters | Support `--jar`, `--files`,` --archives`, `--conf` format. |
| Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. | | Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. |

1
docs/docs/en/guide/task/map-reduce.md

@ -30,6 +30,7 @@ MapReduce(MR) task type used for executing MapReduce programs. For MapReduce nod
| The class of the main function | The **full path** of Main Class, the entry point of the MapReduce program. | | The class of the main function | The **full path** of Main Class, the entry point of the MapReduce program. |
| Main jar package | The jar package of the MapReduce program. | | Main jar package | The jar package of the MapReduce program. |
| Task name | MapReduce task name. | | Task name | MapReduce task name. |
| Yarn queue | Used to set the yarn queue, use `default` queue by default. |
| Command line parameters | Set the input parameters of the MapReduce program and support the substitution of custom parameter variables. | | Command line parameters | Set the input parameters of the MapReduce program and support the substitution of custom parameter variables. |
| Other parameters | Support `-D`, `-files`, `-libjars`, `-archives` format. | | Other parameters | Support `-D`, `-files`, `-libjars`, `-archives` format. |
| User-defined parameter | It is a local user-defined parameter for MapReduce, and will replace the content with `${variable}` in the script. | | User-defined parameter | It is a local user-defined parameter for MapReduce, and will replace the content with `${variable}` in the script. |

1
docs/docs/en/guide/task/spark.md

@ -33,6 +33,7 @@ Spark task type for executing Spark application. When executing the Spark task,
| Driver memory size | Set the size of Driver memories, which can be set according to the actual production environment. | | Driver memory size | Set the size of Driver memories, which can be set according to the actual production environment. |
| Number of Executor | Set the number of Executor, which can be set according to the actual production environment. | | Number of Executor | Set the number of Executor, which can be set according to the actual production environment. |
| Executor memory size | Set the size of Executor memories, which can be set according to the actual production environment. | | Executor memory size | Set the size of Executor memories, which can be set according to the actual production environment. |
| Yarn queue | Set the yarn queue, use `default` queue by default. |
| Main program parameters | Set the input parameters of the Spark program and support the substitution of custom parameter variables. | | Main program parameters | Set the input parameters of the Spark program and support the substitution of custom parameter variables. |
| Optional parameters | Support `--jars`, `--files`,` --archives`, `--conf` format. | | Optional parameters | Support `--jars`, `--files`,` --archives`, `--conf` format. |
| Resource | Appoint resource files in the `Resource` if parameters refer to them. | | Resource | Appoint resource files in the `Resource` if parameters refer to them. |

1
docs/docs/zh/guide/task/flink.md

@ -35,6 +35,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
| taskManager 内存数 | 用于设置 taskManager 内存数,可根据实际生产环境设置对应的内存数 | | taskManager 内存数 | 用于设置 taskManager 内存数,可根据实际生产环境设置对应的内存数 |
| taskManager 数量 | 用于设置 taskManager 的数量,可根据实际生产环境设置对应的数量 | | taskManager 数量 | 用于设置 taskManager 的数量,可根据实际生产环境设置对应的数量 |
| 并行度 | 用于设置执行 Flink 任务的并行度 | | 并行度 | 用于设置执行 Flink 任务的并行度 |
| Yarn 队列 | 用于设置 Yarn 队列,默认使用 default 队列 |
| 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 | | 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 |
| 选项参数 | 支持 `--jar`、`--files`、`--archives`、`--conf` 格式 | | 选项参数 | 支持 `--jar`、`--files`、`--archives`、`--conf` 格式 |
| 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 | | 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |

1
docs/docs/zh/guide/task/map-reduce.md

@ -24,6 +24,7 @@ MapReduce(MR) 任务类型,用于执行 MapReduce 程序。对于 MapReduce
| 主函数的 Class | 是 MapReduce 程序的入口 Main Class 的**全路径** | | 主函数的 Class | 是 MapReduce 程序的入口 Main Class 的**全路径** |
| 主程序包 | 执行 MapReduce 程序的 jar 包 | | 主程序包 | 执行 MapReduce 程序的 jar 包 |
| 任务名称(选填) | MapReduce 任务名称 | | 任务名称(选填) | MapReduce 任务名称 |
| Yarn 队列 | 设置 Yarn 队列,默认使用 default |
| 命令行参数 | 是设置 MapReduce 程序的输入参数,支持自定义参数变量的替换 | | 命令行参数 | 是设置 MapReduce 程序的输入参数,支持自定义参数变量的替换 |
| 其他参数 | 支持 –D、-files、-libjars、-archives 格式 | | 其他参数 | 支持 –D、-files、-libjars、-archives 格式 |
| 自定义参数 | 是 MapReduce 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 | | 自定义参数 | 是 MapReduce 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |

1
docs/docs/zh/guide/task/spark.md

@ -32,6 +32,7 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
- Driver 内存数:用于设置 Driver 内存数,可根据实际生产环境设置对应的内存数。 - Driver 内存数:用于设置 Driver 内存数,可根据实际生产环境设置对应的内存数。
- Executor 数量:用于设置 Executor 的数量,可根据实际生产环境设置对应的内存数。 - Executor 数量:用于设置 Executor 的数量,可根据实际生产环境设置对应的内存数。
- Executor 内存数:用于设置 Executor 内存数,可根据实际生产环境设置对应的内存数。 - Executor 内存数:用于设置 Executor 内存数,可根据实际生产环境设置对应的内存数。
- Yarn 队列:用于设置 Yarn 队列,默认使用 default 队列。
- 主程序参数:设置 Spark 程序的输入参数,支持自定义参数变量的替换。 - 主程序参数:设置 Spark 程序的输入参数,支持自定义参数变量的替换。
- 选项参数:支持 `--jars`、`--files`、`--archives`、`--conf` 格式。 - 选项参数:支持 `--jars`、`--files`、`--archives`、`--conf` 格式。
- 资源:如果其他参数中引用了资源文件,需要在资源中选择指定。 - 资源:如果其他参数中引用了资源文件,需要在资源中选择指定。

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java

@ -106,7 +106,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setExecutorId(processInstance.getExecutorId()); taskExecutionContext.setExecutorId(processInstance.getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode()); taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
taskExecutionContext.setTenantCode(processInstance.getTenantCode()); taskExecutionContext.setTenantCode(processInstance.getTenantCode());
taskExecutionContext.setQueue(processInstance.getQueue());
return this; return this;
} }

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -146,11 +146,6 @@ public class TaskExecutionContext implements Serializable {
*/ */
private String tenantCode; private String tenantCode;
/**
* task queue
*/
private String queue;
/** /**
* process define id * process define id
*/ */

10
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/dataquality/spark/SparkParameters.java

@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters {
/** /**
* The YARN queue to submit to * The YARN queue to submit to
*/ */
private String queue; private String yarnQueue;
/** /**
* other arguments * other arguments
@ -180,12 +180,12 @@ public class SparkParameters extends AbstractParameters {
this.appName = appName; this.appName = appName;
} }
public String getQueue() { public String getYarnQueue() {
return queue; return yarnQueue;
} }
public void setQueue(String queue) { public void setYarnQueue(String yarnQueue) {
this.queue = queue; this.yarnQueue = yarnQueue;
} }
public String getOthers() { public String getOthers() {

4
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -121,10 +121,6 @@ public class DataQualityTask extends AbstractYarnTask {
StringEscapeUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration))) StringEscapeUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration)))
+ "\""); + "\"");
dataQualityParameters
.getSparkParameters()
.setQueue(dqTaskExecutionContext.getQueue());
setMainJarName(); setMainJarName();
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java

@ -106,7 +106,7 @@ public class SparkArgsUtils {
String others = param.getOthers(); String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode) if (!SPARK_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
String queue = param.getQueue(); String queue = param.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) { if (StringUtils.isNotEmpty(queue)) {
args.add(SparkConstants.SPARK_QUEUE); args.add(SparkConstants.SPARK_QUEUE);
args.add(queue); args.add(queue);

1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -56,7 +56,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
if (flinkParameters == null || !flinkParameters.checkParameters()) { if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid"); throw new RuntimeException("flink task params is not valid");
} }
flinkParameters.setQueue(taskExecutionContext.getQueue());
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
} }

26
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -163,9 +163,9 @@ public class FlinkArgsUtils {
} }
// yarn.application.queue // yarn.application.queue
String queue = flinkParameters.getQueue(); String yarnQueue = flinkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) { if (StringUtils.isNotEmpty(yarnQueue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, yarnQueue));
} }
} }
@ -306,26 +306,26 @@ public class FlinkArgsUtils {
case CLUSTER: case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
} else { } else {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE); doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
} }
case APPLICATION: case APPLICATION:
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
} }
} }
private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) { private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) {
String others = flinkParameters.getOthers(); String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(option)) { if (StringUtils.isEmpty(others) || !others.contains(option)) {
String queue = flinkParameters.getQueue(); String yarnQueue = flinkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) { if (StringUtils.isNotEmpty(yarnQueue)) {
switch (option) { switch (option) {
case FlinkConstants.FLINK_QUEUE_FOR_TARGETS: case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue)); args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue));
case FlinkConstants.FLINK_QUEUE_FOR_MODE: case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE); args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
args.add(queue); args.add(yarnQueue);
} }
} }
} }

4
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -48,8 +48,8 @@ public class FlinkConstants {
public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm"; public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE_FOR_MODE = "-yqu"; public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; public static final String FLINK_YARN_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm";

10
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java

@ -86,7 +86,7 @@ public class FlinkParameters extends AbstractParameters {
/** /**
* The YARN queue to submit to * The YARN queue to submit to
*/ */
private String queue; private String yarnQueue;
/** /**
* other arguments * other arguments
@ -194,12 +194,12 @@ public class FlinkParameters extends AbstractParameters {
this.taskManagerMemory = taskManagerMemory; this.taskManagerMemory = taskManagerMemory;
} }
public String getQueue() { public String getYarnQueue() {
return queue; return yarnQueue;
} }
public void setQueue(String queue) { public void setYarnQueue(String yarnQueue) {
this.queue = queue; this.yarnQueue = yarnQueue;
} }
public List<ResourceInfo> getResourceList() { public List<ResourceInfo> getResourceList() {

1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -59,7 +59,6 @@ public class FlinkTask extends AbstractYarnTask {
if (flinkParameters == null || !flinkParameters.checkParameters()) { if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid"); throw new RuntimeException("flink task params is not valid");
} }
flinkParameters.setQueue(taskExecutionContext.getQueue());
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
} }

10
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.mr;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.D; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.D;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME; import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE; import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_YARN_QUEUE;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
@ -67,10 +67,10 @@ public class MapReduceArgsUtils {
} }
String others = param.getOthers(); String others = param.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(MR_QUEUE)) { if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
String queue = param.getQueue(); String yarnQueue = param.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) { if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(String.format("%s%s=%s", D, MR_QUEUE, queue)); args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
} }
} }

14
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceParameters.java

@ -54,9 +54,9 @@ public class MapReduceParameters extends AbstractParameters {
private String appName; private String appName;
/** /**
* queue * The YARN queue to submit to
*/ */
private String queue; private String yarnQueue;
/** /**
* resource list * resource list
@ -101,12 +101,12 @@ public class MapReduceParameters extends AbstractParameters {
this.appName = appName; this.appName = appName;
} }
public String getQueue() { public String getYarnQueue() {
return queue; return yarnQueue;
} }
public void setQueue(String queue) { public void setYarnQueue(String yarnQueue) {
this.queue = queue; this.yarnQueue = yarnQueue;
} }
public List<ResourceInfo> getResourceList() { public List<ResourceInfo> getResourceList() {
@ -152,7 +152,7 @@ public class MapReduceParameters extends AbstractParameters {
return "mainJar= " + mainJar return "mainJar= " + mainJar
+ "mainClass=" + mainClass + "mainClass=" + mainClass
+ "mainArgs=" + mainArgs + "mainArgs=" + mainArgs
+ "queue=" + queue + "yarnQueue=" + yarnQueue
+ "other mainArgs=" + others; + "other mainArgs=" + others;
} }
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -70,8 +70,6 @@ public class MapReduceTask extends AbstractYarnTask {
throw new RuntimeException("mapreduce task params is not valid"); throw new RuntimeException("mapreduce task params is not valid");
} }
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
// replace placeholder,and combine local and global parameters // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap(); Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();

2
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTaskConstants.java

@ -31,6 +31,6 @@ public class MapReduceTaskConstants {
/** /**
* -D mapreduce.job.queuename=queuename * -D mapreduce.job.queuename=queuename
*/ */
public static final String MR_QUEUE = "mapreduce.job.queuename"; public static final String MR_YARN_QUEUE = "mapreduce.job.queuename";
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java

@ -36,7 +36,7 @@ public class SparkConstants {
/** /**
* --queue QUEUE * --queue QUEUE
*/ */
public static final String SPARK_QUEUE = "--queue"; public static final String SPARK_YARN_QUEUE = "--queue";
public static final String DEPLOY_MODE = "--deploy-mode"; public static final String DEPLOY_MODE = "--deploy-mode";

10
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java

@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters {
/** /**
* The YARN queue to submit to * The YARN queue to submit to
*/ */
private String queue; private String yarnQueue;
/** /**
* other arguments * other arguments
@ -198,12 +198,12 @@ public class SparkParameters extends AbstractParameters {
this.appName = appName; this.appName = appName;
} }
public String getQueue() { public String getYarnQueue() {
return queue; return yarnQueue;
} }
public void setQueue(String queue) { public void setYarnQueue(String yarnQueue) {
this.queue = queue; this.yarnQueue = yarnQueue;
} }
public String getOthers() { public String getOthers() {

11
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -81,7 +81,6 @@ public class SparkTask extends AbstractYarnTask {
if (!sparkParameters.checkParameters()) { if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid"); throw new RuntimeException("spark task params is not valid");
} }
sparkParameters.setQueue(taskExecutionContext.getQueue());
log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters)); log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
} }
@ -165,11 +164,11 @@ public class SparkTask extends AbstractYarnTask {
String others = sparkParameters.getOthers(); String others = sparkParameters.getOthers();
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
String queue = sparkParameters.getQueue(); String yarnQueue = sparkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) { if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(SparkConstants.SPARK_QUEUE); args.add(SparkConstants.SPARK_YARN_QUEUE);
args.add(queue); args.add(yarnQueue);
} }
} }

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -854,6 +854,8 @@ export default {
dynamic_separator_tips: 'separator(required)', dynamic_separator_tips: 'separator(required)',
child_node_definition: 'child node definition', child_node_definition: 'child node definition',
child_node_instance: 'child node instance', child_node_instance: 'child node instance',
yarn_queue: 'Yarn Queue',
yarn_queue_tips: 'Please input yarn queue(optional)',
}, },
menu: { menu: {
fav: 'Favorites', fav: 'Favorites',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -830,6 +830,8 @@ export default {
dynamic_separator_tips: '分隔符(必填)', dynamic_separator_tips: '分隔符(必填)',
child_node_definition: '子节点定义', child_node_definition: '子节点定义',
child_node_instance: '子节点实例', child_node_instance: '子节点实例',
yarn_queue: 'Yarn队列',
yarn_queue_tips: '请输入Yarn队列(选填)',
}, },
menu: { menu: {
fav: '收藏组件', fav: '收藏组件',

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -90,3 +90,4 @@ export { useLinkis } from './use-linkis'
export { useDataFactory } from './use-data-factory' export { useDataFactory } from './use-data-factory'
export { useRemoteShell } from './use-remote-shell' export { useRemoteShell } from './use-remote-shell'
export { useDynamic } from './use-dynamic' export { useDynamic } from './use-dynamic'
export { useYarnQueue } from './use-queue'

3
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

@ -16,7 +16,7 @@
*/ */
import { computed, watch, watchEffect } from 'vue' import { computed, watch, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import { useCustomParams, useMainJar, useResources } from '.' import { useCustomParams, useMainJar, useResources, useYarnQueue } from '.'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
export function useFlink(model: { [field: string]: any }): IJsonItem[] { export function useFlink(model: { [field: string]: any }): IJsonItem[] {
@ -282,6 +282,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
}, },
value: model.parallelism value: model.parallelism
}, },
useYarnQueue(),
{ {
type: 'input', type: 'input',
field: 'mainArgs', field: 'mainArgs',

3
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-mr.ts

@ -16,7 +16,7 @@
*/ */
import { computed } from 'vue' import { computed } from 'vue'
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import { useCustomParams, useMainJar, useResources } from '.' import { useCustomParams, useMainJar, useResources, useYarnQueue } from '.'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
export function useMr(model: { [field: string]: any }): IJsonItem[] { export function useMr(model: { [field: string]: any }): IJsonItem[] {
@ -68,6 +68,7 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] {
placeholder: t('project.node.app_name_tips') placeholder: t('project.node.app_name_tips')
} }
}, },
useYarnQueue(),
{ {
type: 'input', type: 'input',
field: 'mainArgs', field: 'mainArgs',

32
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-queue.ts

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
export function useYarnQueue(): IJsonItem {
const { t } = useI18n()
return {
type: 'input',
field: 'yarnQueue',
name: t('project.node.yarn_queue'),
span: 12,
props: {
placeholder: t('project.node.yarn_queue_tips')
},
}
}

4
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts

@ -26,7 +26,8 @@ import {
useExecutorCores, useExecutorCores,
useMainJar, useMainJar,
useNamespace, useNamespace,
useResources useResources,
useYarnQueue
} from '.' } from '.'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
@ -105,6 +106,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
useExecutorNumber(), useExecutorNumber(),
useExecutorMemory(), useExecutorMemory(),
useExecutorCores(), useExecutorCores(),
useYarnQueue(),
{ {
type: 'input', type: 'input',
field: 'mainArgs', field: 'mainArgs',

4
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -62,6 +62,7 @@ export function formatParams(data: INodeData): {
if (data.namespace) { if (data.namespace) {
taskParams.namespace = data.namespace taskParams.namespace = data.namespace
} }
taskParams.yarnQueue = data.yarnQueue
} }
if (data.taskType === 'SPARK') { if (data.taskType === 'SPARK') {
@ -321,7 +322,8 @@ export function formatParams(data: INodeData): {
executorCores: data.executorCores, executorCores: data.executorCores,
executorMemory: data.executorMemory, executorMemory: data.executorMemory,
numExecutors: data.numExecutors, numExecutors: data.numExecutors,
others: data.others others: data.others,
yarnQueue: data.yarnQueue
} }
} }

4
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts

@ -57,7 +57,8 @@ export function useDataQuality({
numExecutors: 2, numExecutors: 2,
executorMemory: '2G', executorMemory: '2G',
executorCores: 2, executorCores: 2,
others: '--conf spark.yarn.maxAppAttempts=1' others: '--conf spark.yarn.maxAppAttempts=1',
yarnQueue: ''
} as INodeData) } as INodeData)
return { return {
@ -84,6 +85,7 @@ export function useDataQuality({
Fields.useExecutorNumber(), Fields.useExecutorNumber(),
Fields.useExecutorMemory(), Fields.useExecutorMemory(),
Fields.useExecutorCores(), Fields.useExecutorCores(),
Fields.useYarnQueue(),
{ {
type: 'input', type: 'input',
field: 'others', field: 'others',

3
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts

@ -53,7 +53,8 @@ export function useFlinkStream({
slot: 1, slot: 1,
taskManager: 2, taskManager: 2,
parallelism: 1, parallelism: 1,
timeoutNotifyStrategy: ['WARN'] timeoutNotifyStrategy: ['WARN'],
yarnQueue: ''
}) })
return { return {

3
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts

@ -53,7 +53,8 @@ export function useFlink({
slot: 1, slot: 1,
taskManager: 2, taskManager: 2,
parallelism: 1, parallelism: 1,
timeoutNotifyStrategy: ['WARN'] timeoutNotifyStrategy: ['WARN'],
yarnQueue: ''
}) })
return { return {

3
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts

@ -44,7 +44,8 @@ export function useMr({
delayTime: 0, delayTime: 0,
timeout: 30, timeout: 30,
programType: 'SCALA', programType: 'SCALA',
timeoutNotifyStrategy: ['WARN'] timeoutNotifyStrategy: ['WARN'],
yarnQueue: '',
} as INodeData) } as INodeData)
return { return {

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts

@ -51,6 +51,7 @@ export function useSpark({
numExecutors: 2, numExecutors: 2,
executorMemory: '2G', executorMemory: '2G',
executorCores: 2, executorCores: 2,
yarnQueue: '',
timeoutNotifyStrategy: ['WARN'] timeoutNotifyStrategy: ['WARN']
} as INodeData) } as INodeData)

2
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -223,6 +223,7 @@ interface ISparkParameters {
executorMemory?: string executorMemory?: string
numExecutors?: number numExecutors?: number
others?: string others?: string
yarnQueue?: string
} }
interface IRuleParameters { interface IRuleParameters {
@ -441,6 +442,7 @@ interface ITaskParams {
degreeOfParallelism?: number degreeOfParallelism?: number
filterCondition?: string filterCondition?: string
listParameters?: Array<any> listParameters?: Array<any>
yarnQueue?: string
} }
interface INodeData interface INodeData

Loading…
Cancel
Save