diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 782b2f2f97..45590d3598 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -258,6 +258,8 @@ public class FlinkArgsUtils { args.add(others); } + // determine yarn queue + determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); ProgramType programType = flinkParameters.getProgramType(); String mainClass = flinkParameters.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { @@ -280,8 +282,6 @@ public class FlinkArgsUtils { args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap))); } - // determine yarn queue - determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); return args; } @@ -291,26 +291,30 @@ public class FlinkArgsUtils { case CLUSTER: if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); } else { - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); } + break; case APPLICATION: - doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS); + doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); + break; } } private static void doAddQueue(List args, FlinkParameters flinkParameters, String option) { String others = flinkParameters.getOthers(); if (StringUtils.isEmpty(others) || !others.contains(option)) { - String queue = flinkParameters.getQueue(); - if (StringUtils.isNotEmpty(queue)) { + String yarnQueue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(yarnQueue)) { switch (option) { - case FlinkConstants.FLINK_QUEUE_FOR_TARGETS: - args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue)); - case FlinkConstants.FLINK_QUEUE_FOR_MODE: - args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE); - args.add(queue); + case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: + args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); + break; + case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: + args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); + args.add(yarnQueue); + break; } } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 6e8d51b2ce..b2d7607761 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -48,8 +48,8 @@ public class FlinkConstants { public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; - public static final String FLINK_QUEUE_FOR_MODE = "-yqu"; - public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; + public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu"; + public static final String FLINK_YARN_QUEUE_FOR_TARGETS = "-Dyarn.application.queue"; public static final String FLINK_TASK_MANAGE = "-yn"; public static final String FLINK_JOB_MANAGE_MEM = "-yjm"; public static final String FLINK_TASK_MANAGE_MEM = "-ytm";