From ae670e6e1c0c0bcd645ab3e1776eca75b9765b8c Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 28 Nov 2022 10:09:49 +0800 Subject: [PATCH] [Bug-13010] [Task] The Flink SQL task page selects the pre-job deployment mode, but the task executed by the worker is the Flink local mode (#13011) --- .../plugin/task/flink/FlinkArgsUtilsTest.java | 8 ++++---- .../plugin/task/flink/FlinkArgsUtils.java | 4 ++-- .../plugin/task/flink/FlinkConstants.java | 1 + .../plugin/task/flink/FlinkArgsUtilsTest.java | 8 ++++---- .../task/components/node/fields/use-flink.ts | 12 ++++++++++++ 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index 8d3f83b622..e8d29df4be 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -115,18 +115,18 @@ public class FlinkArgsUtilsTest { } @Test - public void testInitOptionsInClusterMode() throws Exception { + public void testInitOptionsInLocalMode() throws Exception { List initOptions = - FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); + FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL)); Assertions.assertEquals(2, initOptions.size()); Assertions.assertTrue(initOptions.contains("set execution.target=local")); Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } @Test - public void testInitOptionsInApplicationMode() throws Exception { + public void testInitOptionsInClusterMode() throws Exception { List initOptions = FlinkArgsUtils - .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION)); + .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); Assertions.assertEquals(6, initOptions.size()); Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 5e63a32bba..7112d0e9e7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -126,9 +126,9 @@ public class FlinkArgsUtils { /** * Currently flink sql on yarn only supports yarn-per-job mode */ - if (FlinkDeployMode.CLUSTER == deployMode) { + if (FlinkDeployMode.LOCAL == deployMode) { // execution.target - initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, "local")); + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_LOCAL)); } else { // execution.target initOptions.add( diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index e448596370..de513e0b0d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -43,6 +43,7 @@ public class FlinkConstants { public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_YARN_APPLICATION = "yarn-application"; public static final String FLINK_YARN_PER_JOB = "yarn-per-job"; + public static final String FLINK_LOCAL = "local"; public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index 62148521aa..952e0c42ed 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -115,18 +115,18 @@ public class FlinkArgsUtilsTest { } @Test - public void testInitOptionsInClusterMode() throws Exception { + public void testInitOptionsInLocalMode() throws Exception { List initOptions = - FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); + FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL)); Assertions.assertEquals(2, initOptions.size()); Assertions.assertTrue(initOptions.contains("set execution.target=local")); Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } @Test - public void testInitOptionsInApplicationMode() throws Exception { + public void testInitOptionsInClusterMode() throws Exception { List initOptions = FlinkArgsUtils - .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION)); + .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); Assertions.assertEquals(6, initOptions.size()); Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts index 89035f0441..d987d5ff56 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts @@ -44,6 +44,18 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0)) const deployModeOptions = computed(() => { + if (model.programType === 'SQL') { + return [ + { + label: 'per-job/cluster', + value: 'cluster' + }, + { + label: 'local', + value: 'local' + } + ] + } if (model.flinkVersion === '<1.10') { return [ {