Browse Source

[Bug-13010] [Task] The Flink SQL task page selects the pre-job deployment mode, but the task executed by the worker is the Flink local mode

3.1.2-release
Kerwin 2 years ago committed by zhuangchong
parent
commit
a87206228a
  1. 46
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  3. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  4. 46
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  5. 12
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

46
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest {
flinkParameters.setFlinkVersion("1.11");
List<String> commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest {
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}
@Test
public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assert.assertEquals(2, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=local"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
public void testInitOptionsInLocalMode() throws Exception {
List<String> initOptions =
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
Assertions.assertEquals(2, initOptions.size());
Assertions.assertTrue(initOptions.contains("set execution.target=local"));
Assertions.assertTrue(initOptions.contains("set parallelism.default=4"));
}
@Test
public void testInitOptionsInApplicationMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
Assert.assertEquals(6, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=yarn-per-job"));
Assert.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4"));
Assert.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name"));
Assert.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assertions.assertEquals(6, initOptions.size());
Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job"));
Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4"));
Assertions.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name"));
Assertions.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m"));
Assertions.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m"));
Assertions.assertTrue(initOptions.contains("set parallelism.default=4"));
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -124,9 +124,9 @@ public class FlinkArgsUtils {
/**
* Currently flink sql on yarn only supports yarn-per-job mode
*/
if (FlinkDeployMode.CLUSTER == deployMode) {
if (FlinkDeployMode.LOCAL == deployMode) {
// execution.target
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, "local"));
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_LOCAL));
} else {
// execution.target
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_YARN_PER_JOB));

1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -43,6 +43,7 @@ public class FlinkConstants {
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_YARN_APPLICATION = "yarn-application";
public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
public static final String FLINK_LOCAL = "local";
public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";

46
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
@ -60,7 +60,7 @@ public class FlinkArgsUtilsTest {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -71,21 +71,21 @@ public class FlinkArgsUtilsTest {
flinkParameters.setFlinkVersion("1.11");
List<String> commandLine1 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@ -95,7 +95,7 @@ public class FlinkArgsUtilsTest {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
Assertions.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -106,27 +106,29 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}
@Test
public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assert.assertEquals(2, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=local"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
public void testInitOptionsInLocalMode() throws Exception {
List<String> initOptions =
FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL));
Assertions.assertEquals(2, initOptions.size());
Assertions.assertTrue(initOptions.contains("set execution.target=local"));
Assertions.assertTrue(initOptions.contains("set parallelism.default=4"));
}
@Test
public void testInitOptionsInApplicationMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
Assert.assertEquals(6, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=yarn-per-job"));
Assert.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4"));
Assert.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name"));
Assert.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils
.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assertions.assertEquals(6, initOptions.size());
Assertions.assertTrue(initOptions.contains("set execution.target=yarn-per-job"));
Assertions.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4"));
Assertions.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name"));
Assertions.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m"));
Assertions.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m"));
Assertions.assertTrue(initOptions.contains("set parallelism.default=4"));
}
}

12
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

@ -44,6 +44,18 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
const deployModeOptions = computed(() => {
if (model.programType === 'SQL') {
return [
{
label: 'per-job/cluster',
value: 'cluster'
},
{
label: 'local',
value: 'local'
}
]
}
if (model.flinkVersion === '<1.10') {
return [
{

Loading…
Cancel
Save