From e748c2eb9a8e0e8a365d58a91b5c24db837c8667 Mon Sep 17 00:00:00 2001 From: LiuCanWu <36023380+LiuCanWu@users.noreply.github.com> Date: Thu, 21 Sep 2023 18:46:14 +0800 Subject: [PATCH] fix the #14729 problem (#14902) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 刘阳 Co-authored-by: xiangzihao <460888207@qq.com> --- .../plugin/task/flink/FlinkArgsUtils.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 6937db6b78..6d28ffcc6d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -273,6 +273,8 @@ public class FlinkArgsUtils { args.add(others); } + // determine yarn queue + determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); ProgramType programType = flinkParameters.getProgramType(); String mainClass = flinkParameters.getMainClass(); if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { @@ -295,8 +297,6 @@ public class FlinkArgsUtils { args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap))); } - // determine yarn queue - determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); return args; } @@ -310,8 +310,10 @@ public class FlinkArgsUtils { } else { doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); } + break; case APPLICATION: doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); + break; } } @@ -323,9 +325,11 @@ public class FlinkArgsUtils { switch (option) { case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); + break; case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); args.add(yarnQueue); + break; } } }