|
|
@ -273,6 +273,8 @@ public class FlinkArgsUtils { |
|
|
|
args.add(others); |
|
|
|
args.add(others); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// determine yarn queue
|
|
|
|
|
|
|
|
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); |
|
|
|
ProgramType programType = flinkParameters.getProgramType(); |
|
|
|
ProgramType programType = flinkParameters.getProgramType(); |
|
|
|
String mainClass = flinkParameters.getMainClass(); |
|
|
|
String mainClass = flinkParameters.getMainClass(); |
|
|
|
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { |
|
|
|
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { |
|
|
@ -295,8 +297,6 @@ public class FlinkArgsUtils { |
|
|
|
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap))); |
|
|
|
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap))); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// determine yarn queue
|
|
|
|
|
|
|
|
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion); |
|
|
|
|
|
|
|
return args; |
|
|
|
return args; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -310,8 +310,10 @@ public class FlinkArgsUtils { |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); |
|
|
|
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
break; |
|
|
|
case APPLICATION: |
|
|
|
case APPLICATION: |
|
|
|
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); |
|
|
|
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -323,9 +325,11 @@ public class FlinkArgsUtils { |
|
|
|
switch (option) { |
|
|
|
switch (option) { |
|
|
|
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: |
|
|
|
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS: |
|
|
|
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); |
|
|
|
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue)); |
|
|
|
|
|
|
|
break; |
|
|
|
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: |
|
|
|
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE: |
|
|
|
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); |
|
|
|
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE); |
|
|
|
args.add(yarnQueue); |
|
|
|
args.add(yarnQueue); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|