Browse Source

[Improvement-5452][Task] ds flink task support submit a PyFlink job via the CLI (#5453)

* flink task support submit a PyFlink job via the CLI.

* optimize attribute name.

* Modify pyflink parameter judgment logic
2.0.7-release
zhuangchong 4 years ago committed by GitHub
parent
commit
3026f04d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -863,6 +863,7 @@ public final class Constants {
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final String FLINK_PYTHON = "-py";
public static final int[] NOT_TERMINATED_STATES = new int[] {

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -117,9 +117,15 @@ public class FlinkArgsUtils {
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
if (ProgramType.PYTHON == programType) {
// -py
args.add(Constants.FLINK_PYTHON);
} else if (programType != null && StringUtils.isNotEmpty(mainClass)) {
// -c
args.add(Constants.FLINK_MAIN_CLASS);
// main class
args.add(param.getMainClass());
}
ResourceInfo mainJar = param.getMainJar();

Loading…
Cancel
Save