From cd73e3b5ad3c855ddafc37739fc3c0b46de1a17c Mon Sep 17 00:00:00 2001 From: Kirs Date: Tue, 27 Jul 2021 23:48:15 +0800 Subject: [PATCH] [1.3.7-prepare#5453][Improvement][Task] ds flink task support submit a PyFlink job via the CLI (#5847) issue #5452 pr #5453 --- .../apache/dolphinscheduler/common/Constants.java | 2 +- .../server/utils/FlinkArgsUtils.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index e8447fa079..d29791bf5f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -837,7 +837,7 @@ public final class Constants { public static final String FLINK_MAIN_CLASS = "-c"; public static final String FLINK_PARALLELISM = "-p"; public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; - + public static final String FLINK_PYTHON = "-py"; public static final int[] NOT_TERMINATED_STATES = new int[]{ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index dbd92e020f..e12ebfca08 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -117,9 +117,15 @@ public class FlinkArgsUtils { ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(Constants.FLINK_MAIN_CLASS); //-c - args.add(param.getMainClass()); //main class + + if (ProgramType.PYTHON == programType) { + // -py + args.add(Constants.FLINK_PYTHON); + } else if (programType != null && StringUtils.isNotEmpty(mainClass)) { + // -c + args.add(Constants.FLINK_MAIN_CLASS); + // main class + args.add(param.getMainClass()); } ResourceInfo mainJar = param.getMainJar();