Browse Source

[Fix-10039] Flink run command when perfecting Python jobs (#10042)

* [fix] flink task

* [fix] flink task

(cherry picked from commit d643e1c1cf)
3.0.0/version-upgrade
Dannila 3 years ago committed by Jiajie Zhong
parent
commit
d74aa47196
  1. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -50,6 +50,7 @@ public class FlinkConstants {
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final String FLINK_PYTHON = "-py";
public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s";

4
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -149,6 +149,10 @@ public class FlinkTask extends AbstractYarnTask {
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
// -py
if(ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
args.add(mainJar.getRes());
}

Loading…
Cancel
Save