From 3026f04d8528a63f26d9b62da00a495c8e9f47ab Mon Sep 17 00:00:00 2001 From: zhuangchong <37063904+zhuangchong@users.noreply.github.com> Date: Thu, 17 Jun 2021 15:19:25 +0800 Subject: [PATCH] [Improvement-5452][Task] ds flink task support submit a PyFlink job via the CLI (#5453) * flink task support submit a PyFlink job via the CLI. * optimize attribute name. * Modify pyflink parameter judgment logic --- .../apache/dolphinscheduler/common/Constants.java | 1 + .../server/utils/FlinkArgsUtils.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 59e65c578e..d9e44c5092 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -863,6 +863,7 @@ public final class Constants { public static final String FLINK_MAIN_CLASS = "-c"; public static final String FLINK_PARALLELISM = "-p"; public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; + public static final String FLINK_PYTHON = "-py"; public static final int[] NOT_TERMINATED_STATES = new int[] { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index dbd92e020f..e12ebfca08 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -117,9 +117,15 @@ public class FlinkArgsUtils { ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(Constants.FLINK_MAIN_CLASS); //-c - args.add(param.getMainClass()); //main class + + if (ProgramType.PYTHON == programType) { + // -py + args.add(Constants.FLINK_PYTHON); + } else if (programType != null && StringUtils.isNotEmpty(mainClass)) { + // -c + args.add(Constants.FLINK_MAIN_CLASS); + // main class + args.add(param.getMainClass()); } ResourceInfo mainJar = param.getMainJar();