From d74aa47196e8016895619977a2ae52a0b8d3007a Mon Sep 17 00:00:00 2001 From: Dannila <94423827+Dannila@users.noreply.github.com> Date: Mon, 16 May 2022 15:06:21 +0800 Subject: [PATCH] [Fix-10039] Flink run command when perfecting Python jobs (#10042) * [fix] flink task * [fix] flink task (cherry picked from commit d643e1c1cf27d6acfc46deb655e0b5f2f813cb24) --- .../dolphinscheduler/plugin/task/flink/FlinkConstants.java | 1 + .../apache/dolphinscheduler/plugin/task/flink/FlinkTask.java | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 2e55de9b25..42cb5ad78c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -50,6 +50,7 @@ public class FlinkConstants { public static final String FLINK_MAIN_CLASS = "-c"; public static final String FLINK_PARALLELISM = "-p"; public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; + public static final String FLINK_PYTHON = "-py"; public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s"; public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 966e8a01bd..f70a3d67b7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -149,6 +149,10 @@ public class FlinkTask extends AbstractYarnTask { ResourceInfo mainJar = flinkParameters.getMainJar(); if (mainJar != null) { + // -py + if(ProgramType.PYTHON == programType) { + args.add(FlinkConstants.FLINK_PYTHON); + } args.add(mainJar.getRes()); }