From de2cc0e2352c2a99ab46a036af2aab1fe24d9941 Mon Sep 17 00:00:00 2001 From: ORuteMa <30034544+ORuteMa@users.noreply.github.com> Date: Mon, 12 Jun 2023 11:04:39 +0800 Subject: [PATCH] [bug][plugin]Fix: Correct the way to determine the yarn queue in Flink CommandLine and SQL mode (#14237) * Fix: Correct the way to determine the yarn queue in Flink CommandLine * fix the yarn queue in sql mode && refine the code * refine code * remove unnecessary comment * fix yarn queue properties * remove redundant variable --- .../plugin/task/flink/FlinkArgsUtils.java | 50 ++++++++++++++----- .../plugin/task/flink/FlinkConstants.java | 3 +- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 20f460857c..f56a69e553 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -164,12 +164,9 @@ public class FlinkArgsUtils { } // yarn.application.queue - String others = flinkParameters.getOthers(); - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); - } + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); } } @@ -254,13 +251,6 @@ public class FlinkArgsUtils { args.add(taskManagerMemory); } - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { // -yqu - args.add(FlinkConstants.FLINK_QUEUE); - args.add(queue); - } - } break; case LOCAL: break; @@ -306,6 +296,40 @@ public class FlinkArgsUtils { args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap))); } + // determine yarn queue + determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); return args; } + + private static void determinedYarnQueue(List args, FlinkParameters flinkParameters, + FlinkDeployMode deployMode, String flinkVersion) { + switch (deployMode) { + case CLUSTER: + if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) + || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + } else { + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE); + } + case APPLICATION: + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + } + } + + private static void doAddQueue(List args, FlinkParameters flinkParameters, String option) { + String others = flinkParameters.getOthers(); + if (StringUtils.isEmpty(others) || !others.contains(option)) { + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + switch (option) { + case FlinkConstants.FLINK_QUEUE_FOR_TARGETS: + args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue)); + case FlinkConstants.FLINK_QUEUE_FOR_MODE: + args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE); + args.add(queue); + } + } + } + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index de513e0b0d..6e8d51b2ce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -48,7 +48,8 @@ public class FlinkConstants { public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; - public static final String FLINK_QUEUE = "-yqu"; + public static final String FLINK_QUEUE_FOR_MODE = "-yqu"; + public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm";