diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index cbd9852e2e..5a42755c1f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.util.List; @@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest { FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest { flinkParameters.setFlinkVersion("1.11"); List commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); List commandLine2 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); List commandLine3 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest { FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest { flinkParameters.setProgramType(ProgramType.SQL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", + Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", joinStringListWithSpace(commandLine)); } @Test - public void testInitOptionsInClusterMode() throws Exception { - List initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); - Assert.assertEquals(2, initOptions.size()); - Assert.assertTrue(initOptions.contains("set execution.target=local")); - Assert.assertTrue(initOptions.contains("set parallelism.default=4")); + public void testInitOptionsInLocalMode() throws Exception { + List initOptions = + FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL)); + Assertions.assertEquals(2, initOptions.size()); + Assertions.assertTrue(initOptions.contains("set execution.target=local")); + Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } @Test - public void testInitOptionsInApplicationMode() throws Exception { - List initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION)); - Assert.assertEquals(6, initOptions.size()); - Assert.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); - Assert.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); - Assert.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name")); - Assert.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m")); - Assert.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m")); - Assert.assertTrue(initOptions.contains("set parallelism.default=4")); + public void testInitOptionsInClusterMode() throws Exception { + List initOptions = FlinkArgsUtils + .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); + Assertions.assertEquals(6, initOptions.size()); + Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); + Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); + Assertions.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name")); + Assertions.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m")); + Assertions.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m")); + Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index a231b67034..69f7465a75 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -124,9 +124,9 @@ public class FlinkArgsUtils { /** * Currently flink sql on yarn only supports yarn-per-job mode */ - if (FlinkDeployMode.CLUSTER == deployMode) { + if (FlinkDeployMode.LOCAL == deployMode) { // execution.target - initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, "local")); + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_LOCAL)); } else { // execution.target initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_YARN_PER_JOB)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index e448596370..de513e0b0d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -43,6 +43,7 @@ public class FlinkConstants { public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_YARN_APPLICATION = "yarn-application"; public static final String FLINK_YARN_PER_JOB = "yarn-per-job"; + public static final String FLINK_LOCAL = "local"; public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index f71afb4d0e..cdedae8da8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.util.List; @@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest { FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest { flinkParameters.setFlinkVersion("1.11"); List commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine1)); flinkParameters.setFlinkVersion("<1.10"); List commandLine2 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine2)); flinkParameters.setFlinkVersion(">=1.12"); List commandLine3 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine3)); } @@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest { FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals( + Assertions.assertEquals( "flink run -p 4 -sae -c org.example.Main /opt/job.jar", joinStringListWithSpace(commandLine)); } @@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest { flinkParameters.setProgramType(ProgramType.SQL); List commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters); - Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", + Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql", joinStringListWithSpace(commandLine)); } @Test - public void testInitOptionsInClusterMode() throws Exception { - List initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); - Assert.assertEquals(2, initOptions.size()); - Assert.assertTrue(initOptions.contains("set execution.target=local")); - Assert.assertTrue(initOptions.contains("set parallelism.default=4")); + public void testInitOptionsInLocalMode() throws Exception { + List initOptions = + FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL)); + Assertions.assertEquals(2, initOptions.size()); + Assertions.assertTrue(initOptions.contains("set execution.target=local")); + Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } @Test - public void testInitOptionsInApplicationMode() throws Exception { - List initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION)); - Assert.assertEquals(6, initOptions.size()); - Assert.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); - Assert.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); - Assert.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name")); - Assert.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m")); - Assert.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m")); - Assert.assertTrue(initOptions.contains("set parallelism.default=4")); + public void testInitOptionsInClusterMode() throws Exception { + List initOptions = FlinkArgsUtils + .buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER)); + Assertions.assertEquals(6, initOptions.size()); + Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job")); + Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4")); + Assertions.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name")); + Assertions.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m")); + Assertions.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m")); + Assertions.assertTrue(initOptions.contains("set parallelism.default=4")); } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts index 89035f0441..d987d5ff56 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts @@ -44,6 +44,18 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0)) const deployModeOptions = computed(() => { + if (model.programType === 'SQL') { + return [ + { + label: 'per-job/cluster', + value: 'cluster' + }, + { + label: 'local', + value: 'local' + } + ] + } if (model.flinkVersion === '<1.10') { return [ {