diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 69f7465a75..782b2f2f97 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -156,12 +156,9 @@ public class FlinkArgsUtils { } // yarn.application.queue - String others = flinkParameters.getOthers(); - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); - } + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); } } @@ -241,13 +238,6 @@ public class FlinkArgsUtils { args.add(taskManagerMemory); } - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { // -yqu - args.add(FlinkConstants.FLINK_QUEUE); - args.add(queue); - } - } break; case LOCAL: break; @@ -290,6 +280,40 @@ public class FlinkArgsUtils { args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap))); } + // determine yarn queue + determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); return args; } + + private static void determinedYarnQueue(List args, FlinkParameters flinkParameters, + FlinkDeployMode deployMode, String flinkVersion) { + switch (deployMode) { + case CLUSTER: + if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) + || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + } else { + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE); + } + case APPLICATION: + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + } + } + + private static void doAddQueue(List args, FlinkParameters flinkParameters, String option) { + String others = flinkParameters.getOthers(); + if (StringUtils.isEmpty(others) || !others.contains(option)) { + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + switch (option) { + case FlinkConstants.FLINK_QUEUE_FOR_TARGETS: + args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue)); + case FlinkConstants.FLINK_QUEUE_FOR_MODE: + args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE); + args.add(queue); + } + } + } + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index de513e0b0d..6e8d51b2ce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -48,7 +48,8 @@ public class FlinkConstants { public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; - public static final String FLINK_QUEUE = "-yqu"; + public static final String FLINK_QUEUE_FOR_MODE = "-yqu"; + public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm";