Browse Source

cherry-pick [Fix-14729] fix problem with the command generated by the flink task #14902

3.1.9-release
LiuCanWu 1 year ago committed by zhuangchong
parent
commit
28e74e41b9
  1. 28
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

28
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -258,6 +258,8 @@ public class FlinkArgsUtils {
args.add(others);
}
// determine yarn queue
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
@ -280,8 +282,6 @@ public class FlinkArgsUtils {
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
// determine yarn queue
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
return args;
}
@ -291,26 +291,30 @@ public class FlinkArgsUtils {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
} else {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
}
break;
case APPLICATION:
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
break;
}
}
private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) {
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(option)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
String yarnQueue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
switch (option) {
case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
case FlinkConstants.FLINK_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
args.add(queue);
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue));
break;
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
args.add(yarnQueue);
break;
}
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -48,8 +48,8 @@ public class FlinkConstants {
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_YARN_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";

Loading…
Cancel
Save