diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 6937db6b78..6d28ffcc6d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -273,6 +273,8 @@ public class FlinkArgsUtils { args.add(others); } + // determine yarn queue + determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); ProgramType programType = flinkParameters.getProgramType(); String mainClass = flinkParameters.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { @@ -295,8 +297,6 @@ public class FlinkArgsUtils { args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap))); } - // determine yarn queue - determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); return args; } @@ -310,8 +310,10 @@ public class FlinkArgsUtils { } else { doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); } + break; case APPLICATION: doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); + break; } } @@ -323,9 +325,11 @@ public class FlinkArgsUtils { switch (option) { case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); + break; case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); args.add(yarnQueue); + break; } } }