Browse Source

[Improvement][Flink] Change Flink command into FLINK_HOME (#16004)

upstream-dev
JohnHuang 5 months ago committed by GitHub
parent
commit
a7245189a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  3. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

13
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
Assertions.assertEquals(
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -27,14 +27,14 @@ public class FlinkConstants {
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
public static final String FLINK_COMMAND = "flink";
public static final String FLINK_COMMAND = "${FLINK_HOME}/bin/flink";
public static final String FLINK_RUN = "run";
/**
* flink sql command
* usage: sql-client.sh -i <initialization file>, -f <script file>
*/
public static final String FLINK_SQL_COMMAND = "sql-client.sh";
public static final String FLINK_SQL_COMMAND = "${FLINK_HOME}/bin/sql-client.sh";
/**
* flink run options

13
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assertions.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
Assertions.assertEquals(
"${FLINK_HOME}/bin/sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}

Loading…
Cancel
Save