diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md index 4307002ed1..f0d081bdec 100644 --- a/docs/docs/en/guide/task/flink.md +++ b/docs/docs/en/guide/task/flink.md @@ -2,7 +2,11 @@ ## Overview -Flink task type for executing Flink programs. For Flink nodes, the worker submits the task by using the Flink command `flink run`. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details. +Flink task type, used to execute Flink programs. For Flink nodes: + +1. When the program type is Java, Scala or Python, the worker submits the task `flink run` using the Flink command. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details. + +2. When the program type is SQL, the worker submit tasks using `sql-client.sh`. See [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) for more details. ## Create Task @@ -21,10 +25,12 @@ Flink task type for executing Flink programs. For Flink nodes, the worker submit - **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task. - **Delayed execution time**: The time (unit minute) that a task delays in execution. - **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail. -- **Program type**: Supports Java, Scala and Python. +- **Program type**: Support Java, Scala, Python and SQL four languages. - **The class of main function**: The **full path** of Main Class, the entry point of the Flink program. - **Main jar package**: The jar package of the Flink program (upload by Resource Center). - **Deployment mode**: Support 2 deployment modes: cluster and local. +- **Initialization script**: Script file to initialize session context. +- **Script**: The sql script file developed by the user that should be executed. - **Flink version**: Select version according to the execution env. - **Task name** (optional): Flink task name. - **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment. @@ -64,6 +70,14 @@ Configure the required content according to the parameter descriptions above. ![demo-flink-simple](/img/tasks/demo/flink_task02.png) +### Execute the FlinkSQL Program + +Configure the required content according to the parameter descriptions above. + +![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png) + ## Notice -JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same. +- JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same. + +- Use SQL to execute Flink SQL tasks, currently only Flink 1.13 and above are supported. diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md index e4a0dac084..84acd60422 100644 --- a/docs/docs/zh/guide/task/flink.md +++ b/docs/docs/zh/guide/task/flink.md @@ -2,7 +2,11 @@ ## 综述 -Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker 会通过使用 flink 命令 `flink run` 的方式提交任务。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/)。 +Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点: + +1. 当程序类型为 Java、Scala 或 Python 时,worker 使用 Flink 命令提交任务 `flink run`。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) 。 + +2. 当程序类型为 SQL 时,worker 使用`sql-client.sh` 提交任务。更多详情查看 [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) 。 ## 创建任务 @@ -21,10 +25,12 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker - 失败重试间隔:任务失败重新提交任务的时间间隔,以分钟为单位。 - 延迟执行时间:任务延迟执行的时间,以分钟为单位。 - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。 -- 程序类型:支持 Java、Scala 和 Python 三种语言。 +- 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。 - 部署方式:支持 cluster 和 local 两种模式的部署。 +- 初始化脚本:用于初始化会话上下文的脚本文件。 +- 脚本:用户开发的应该执行的 SQL 脚本文件。 - Flink 版本:根据所需环境选择对应的版本即可。 - 任务名称(选填):Flink 程序的名称。 - jobManager 内存数:用于设置 jobManager 内存数,可根据实际生产环境设置对应的内存数。 @@ -64,6 +70,14 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker ![demo-flink-simple](/img/tasks/demo/flink_task02.png) +### 执行 FlinkSQL 程序 + +根据上述参数说明,配置所需的内容即可。 + +![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png) + ## 注意事项: -Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。 +- Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。 + +- 使用 SQL 执行 Flink SQL 任务,目前只支持 Flink 1.13及以上版本。 diff --git a/docs/img/tasks/demo/flink_sql_test.png b/docs/img/tasks/demo/flink_sql_test.png new file mode 100644 index 0000000000..4cba8c7cf2 Binary files /dev/null and b/docs/img/tasks/demo/flink_sql_test.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java deleted file mode 100644 index ea047bea6c..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.flink; - -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * flink args utils - */ -public class FlinkArgsUtils { - - private FlinkArgsUtils() { - throw new IllegalStateException("Utility class"); - } - - private static final String LOCAL_DEPLOY_MODE = "local"; - private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; - - /** - * build args - * - * @param param flink parameters - * @return argument list - */ - public static List buildArgs(FlinkParameters param) { - List args = new ArrayList<>(); - - String deployMode = "cluster"; - String tmpDeployMode = param.getDeployMode(); - if (StringUtils.isNotEmpty(tmpDeployMode)) { - deployMode = tmpDeployMode; - } - String others = param.getOthers(); - if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { - args.add(FlinkConstants.FLINK_RUN_MODE); //-m - - args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster - - int slot = param.getSlot(); - if (slot > 0) { - args.add(FlinkConstants.FLINK_YARN_SLOT); - args.add(String.format("%d", slot)); //-ys - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { //-ynm - args.add(FlinkConstants.FLINK_APP_NAME); - args.add(ArgsUtils.escape(appName)); - } - - // judge flink version, the parameter -yn has removed from flink 1.10 - String flinkVersion = param.getFlinkVersion(); - if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { - int taskManager = param.getTaskManager(); - if (taskManager > 0) { //-yn - args.add(FlinkConstants.FLINK_TASK_MANAGE); - args.add(String.format("%d", taskManager)); - } - } - String jobManagerMemory = param.getJobManagerMemory(); - if (StringUtils.isNotEmpty(jobManagerMemory)) { - args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM); - args.add(jobManagerMemory); //-yjm - } - - String taskManagerMemory = param.getTaskManagerMemory(); - if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm - args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM); - args.add(taskManagerMemory); - } - - if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { // -yqu - args.add(FlinkConstants.FLINK_QUEUE); - args.add(queue); - } - } - } - - int parallelism = param.getParallelism(); - if (parallelism > 0) { - args.add(FlinkConstants.FLINK_PARALLELISM); - args.add(String.format("%d", parallelism)); // -p - } - - // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly - // The task status will be synchronized with the cluster job status - args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae - - // -s -yqu -yat -yD -D - if (StringUtils.isNotEmpty(others)) { - args.add(others); - } - - ProgramType programType = param.getProgramType(); - String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c - args.add(param.getMainClass()); //main class - } - - ResourceInfo mainJar = param.getMainJar(); - if (mainJar != null) { - args.add(mainJar.getRes()); - } - - String mainArgs = param.getMainArgs(); - if (StringUtils.isNotEmpty(mainArgs)) { - args.add(mainArgs); - } - - return args; - } - -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 7a3c1c6fc9..2e55de9b25 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -24,7 +24,20 @@ public class FlinkConstants { } /** - * flink + * flink command + * usage: flink run [OPTIONS] + */ + public static final String FLINK_COMMAND = "flink"; + public static final String FLINK_RUN = "run"; + + /** + * flink sql command + * usage: sql-client.sh -i , -f