Browse Source

[bug][plugin]Fix: Correct the way to determine the yarn queue in Flink CommandLine and SQL mode (#14237)

* Fix: Correct the way to determine the yarn queue in Flink CommandLine

* fix the yarn queue in sql mode && refine the code

* refine code

* remove unnecessary comment

* fix yarn queue properties

* remove redundant variable
3.1.8-release
ORuteMa 1 year ago committed by zhuangchong
parent
commit
87481aa7e0
  1. 50
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  2. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

50
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -156,12 +156,9 @@ public class FlinkArgsUtils {
}
// yarn.application.queue
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
}
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
}
}
@ -241,13 +238,6 @@ public class FlinkArgsUtils {
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
break;
case LOCAL:
break;
@ -290,6 +280,40 @@ public class FlinkArgsUtils {
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
// determine yarn queue
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
return args;
}
private static void determinedYarnQueue(List<String> args, FlinkParameters flinkParameters,
FlinkDeployMode deployMode, String flinkVersion) {
switch (deployMode) {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
} else {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE);
}
case APPLICATION:
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
}
}
private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) {
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(option)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
switch (option) {
case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
case FlinkConstants.FLINK_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
args.add(queue);
}
}
}
}
}

3
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -48,7 +48,8 @@ public class FlinkConstants {
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";

Loading…
Cancel
Save