Browse Source

[Feature- 9837][plugin/ui] support FlinkSQL Task (#9840)

* flink_sql

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* '[refactor]flinksql'

* [refactor] flinksql

* '[refactor]flinksql'

* [refactor] dolphinscheduler flinksql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* [refactor] flink sql

* Update docs/docs/en/guide/task/flink.md

* Update docs/docs/zh/guide/task/flink.md

* prettier front-end code

Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
Co-authored-by: devosend <devosend@gmail.com>
3.0.0/version-upgrade
Dannila 2 years ago committed by GitHub
parent
commit
2d36449444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      docs/docs/en/guide/task/flink.md
  2. 20
      docs/docs/zh/guide/task/flink.md
  3. BIN
      docs/img/tasks/demo/flink_sql_test.png
  4. 136
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  5. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  6. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
  7. 325
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  8. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java
  9. 118
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
  10. 2
      dolphinscheduler-ui/src/locales/modules/en_US.ts
  11. 2
      dolphinscheduler-ui/src/locales/modules/zh_CN.ts
  12. 55
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
  13. 6
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
  14. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  15. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
  16. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

20
docs/docs/en/guide/task/flink.md

@ -2,7 +2,11 @@
## Overview
Flink task type for executing Flink programs. For Flink nodes, the worker submits the task by using the Flink command `flink run`. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details.
Flink task type, used to execute Flink programs. For Flink nodes:
1. When the program type is Java, Scala or Python, the worker submits the task `flink run` using the Flink command. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details.
2. When the program type is SQL, the worker submit tasks using `sql-client.sh`. See [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) for more details.
## Create Task
@ -21,10 +25,12 @@ Flink task type for executing Flink programs. For Flink nodes, the worker submit
- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
- **Delayed execution time**: The time (unit minute) that a task delays in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- **Program type**: Supports Java, Scala and Python.
- **Program type**: Support Java, Scala, Python and SQL four languages.
- **The class of main function**: The **full path** of Main Class, the entry point of the Flink program.
- **Main jar package**: The jar package of the Flink program (upload by Resource Center).
- **Deployment mode**: Support 2 deployment modes: cluster and local.
- **Initialization script**: Script file to initialize session context.
- **Script**: The sql script file developed by the user that should be executed.
- **Flink version**: Select version according to the execution env.
- **Task name** (optional): Flink task name.
- **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment.
@ -64,6 +70,14 @@ Configure the required content according to the parameter descriptions above.
![demo-flink-simple](/img/tasks/demo/flink_task02.png)
### Execute the FlinkSQL Program
Configure the required content according to the parameter descriptions above.
![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png)
## Notice
JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same.
- JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same.
- Use SQL to execute Flink SQL tasks, currently only Flink 1.13 and above are supported.

20
docs/docs/zh/guide/task/flink.md

@ -2,7 +2,11 @@
## 综述
Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker 会通过使用 flink 命令 `flink run` 的方式提交任务。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/)。
Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
1. 当程序类型为 Java、Scala 或 Python 时,worker 使用 Flink 命令提交任务 `flink run`。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) 。
2. 当程序类型为 SQL 时,worker 使用`sql-client.sh` 提交任务。更多详情查看 [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) 。
## 创建任务
@ -21,10 +25,12 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分钟为单位。
- 延迟执行时间:任务延迟执行的时间,以分钟为单位。
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
- 程序类型:支持 Java、Scala 和 Python 三种语言。
- 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。
- 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。
- 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
- 部署方式:支持 cluster 和 local 两种模式的部署。
- 初始化脚本:用于初始化会话上下文的脚本文件。
- 脚本:用户开发的应该执行的 SQL 脚本文件。
- Flink 版本:根据所需环境选择对应的版本即可。
- 任务名称(选填):Flink 程序的名称。
- jobManager 内存数:用于设置 jobManager 内存数,可根据实际生产环境设置对应的内存数。
@ -64,6 +70,14 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker
![demo-flink-simple](/img/tasks/demo/flink_task02.png)
### 执行 FlinkSQL 程序
根据上述参数说明,配置所需的内容即可。
![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png)
## 注意事项:
Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。
- Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。
- 使用 SQL 执行 Flink SQL 任务,目前只支持 Flink 1.13及以上版本。

BIN
docs/img/tasks/demo/flink_sql_test.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 973 KiB

136
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -1,136 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* flink args utils
*/
public class FlinkArgsUtils {
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
/**
* build args
*
* @param param flink parameters
* @return argument list
*/
public static List<String> buildArgs(FlinkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(FlinkConstants.FLINK_RUN_MODE); //-m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager > 0) { //-yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}
String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
}
int parallelism = param.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

34
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -24,7 +24,20 @@ public class FlinkConstants {
}
/**
* flink
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
public static final String FLINK_COMMAND = "flink";
public static final String FLINK_RUN = "run";
/**
* flink sql command
* usage: sql-client.sh -i <initialization file>, -f <script file>
*/
public static final String FLINK_SQL_COMMAND = "sql-client.sh";
/**
* flink run options
*/
public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m";
@ -32,11 +45,28 @@ public class FlinkConstants {
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
public static final String FLINK_MAIN_CLASS = "-c";
public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set yarn.application.queue=%s";
public static final String FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE = "set jobmanager.memory.process.size=%s";
public static final String FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE = "set taskmanager.memory.process.size=%s";
public static final String FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS = "set taskmanager.numberOfTaskSlots=%d";
public static final String FLINK_FORMAT_PARALLELISM_DEFAULT = "set parallelism.default=%d";
public static final String FLINK_SQL_SCRIPT_FILE = "-f";
public static final String FLINK_SQL_INIT_FILE = "-i";
public static final String FLINK_SQL_NEWLINE = ";\n";
// execution.target options
public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job";
public static final String EXECUTION_TARGET_LOACL = "local";
public static final String DEPLOY_MODE_CLUSTER = "cluster";
public static final String DEPLOY_MODE_LOCAL = "local";
public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
}

36
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java

@ -100,10 +100,20 @@ public class FlinkParameters extends AbstractParameters {
/**
* program type
* 0 JAVA,1 SCALA,2 PYTHON
* 0 JAVA,1 SCALA,2 PYTHON,3 SQL
*/
private ProgramType programType;
/**
* flink sql initialization file
*/
private String initScript;
/**
* flink sql script file
*/
private String rawScript;
public ResourceInfo getMainJar() {
return mainJar;
}
@ -224,9 +234,30 @@ public class FlinkParameters extends AbstractParameters {
this.flinkVersion = flinkVersion;
}
public String getInitScript() {
return initScript;
}
public void setInitScript(String initScript) {
this.initScript = initScript;
}
public String getRawScript() {
return rawScript;
}
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
@Override
public boolean checkParameters() {
return mainJar != null && programType != null;
/**
* When saving a task, the parameter cannot be empty. There are two judgments:
* (1) When ProgramType is SQL, rawScript cannot be empty.
* (2) When ProgramType is Java/Scala/Python, mainJar cannot be empty.
*/
return programType != null && (rawScript != null || mainJar != null);
}
@Override
@ -236,5 +267,4 @@ public class FlinkParameters extends AbstractParameters {
}
return resourceList;
}
}

325
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.flink;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@ -24,26 +26,30 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class FlinkTask extends AbstractYarnTask {
/**
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
private static final String FLINK_COMMAND = "flink";
private static final String FLINK_RUN = "run";
/**
* flink parameters
* flink parameters
*/
private FlinkParameters flinkParameters;
@ -68,53 +74,310 @@ public class FlinkTask extends AbstractYarnTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
if (ProgramType.SQL != flinkParameters.getProgramType()) {
setMainJarName();
}
}
/**
* create command
*
* @return command
*/
@Override
protected String buildCommand() {
List<String> args = new ArrayList<>();
if (ProgramType.SQL != flinkParameters.getProgramType()) {
// execute flink run [OPTIONS] <jar-file> <arguments>
args.add(FlinkConstants.FLINK_COMMAND);
args.add(FlinkConstants.FLINK_RUN);
args.addAll(populateFlinkOptions());
} else {
// execute sql-client.sh -f <script file>
args.add(FlinkConstants.FLINK_SQL_COMMAND);
args.addAll(populateFlinkSqlOptions());
}
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("flink task command : {}", command);
return command;
}
/**
* build flink options
*
* @return argument list
*/
private List<String> populateFlinkOptions() {
List<String> args = new ArrayList<>();
String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
populateFlinkOnYarnOptions(args);
}
// -p
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism));
}
/**
* -sae
*
* If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly.
* The task status will be synchronized with the cluster job status.
*/
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT);
// -s -yqu -yat -yD -D
String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
// -c
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS);
args.add(flinkParameters.getMainClass());
}
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
return args;
}
private void populateFlinkOnYarnOptions(List<String> args) {
// -m yarn-cluster
args.add(FlinkConstants.FLINK_RUN_MODE);
args.add(FlinkConstants.FLINK_YARN_CLUSTER);
// -ys
int slot = flinkParameters.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot));
}
// -ynm
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
/**
* -yn
*
* Note: judge flink version, the parameter -yn has removed from flink 1.10
*/
String flinkVersion = flinkParameters.getFlinkVersion();
if (flinkVersion == null || FlinkConstants.FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = flinkParameters.getTaskManager();
if (taskManager > 0) {
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
// -yjm
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory);
}
logger.info("param Map : {}", paramsMap);
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
flinkParameters.setMainArgs(args);
// -ytm
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) {
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
// -yqu
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
}
/**
* create command
* @return command
* build flink sql options
*
* @return argument list
*/
@Override
protected String buildCommand() {
// flink run [OPTIONS] <jar-file> <arguments>
private List<String> populateFlinkSqlOptions() {
List<String> args = new ArrayList<>();
List<String> defalutOptions = new ArrayList<>();
args.add(FLINK_COMMAND);
args.add(FLINK_RUN);
logger.info("flink task args : {}", args);
// other parameters
args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
/**
* Currently flink sql on yarn only supports yarn-per-job mode
*/
if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
populateFlinkSqlOnYarnOptions(defalutOptions);
} else {
// execution.target
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_LOACL));
}
logger.info("flink task command : {}", command);
// parallelism.default
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, parallelism));
}
return command;
// -i
args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
args.add(generateInitScriptFile(StringUtils.join(defalutOptions, FlinkConstants.FLINK_SQL_NEWLINE).concat(FlinkConstants.FLINK_SQL_NEWLINE)));
// -f
args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
args.add(generateScriptFile());
String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
return args;
}
private void populateFlinkSqlOnYarnOptions(List<String> defalutOptions) {
// execution.target
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_YARN_PER_JOB));
// taskmanager.numberOfTaskSlots
int slot = flinkParameters.getSlot();
if (slot > 0) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS, slot));
}
// yarn.application.name
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME, ArgsUtils.escape(appName)));
}
// jobmanager.memory.process.size
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE, jobManagerMemory));
}
// taskmanager.memory.process.size
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE, taskManagerMemory));
}
// yarn.application.queue
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
}
}
}
private String generateInitScriptFile(String parameters) {
String initScriptFileName = String.format("%s/%s_init.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
File file = new File(initScriptFileName);
Path path = file.toPath();
if (!Files.exists(path)) {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
try {
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
}
// Flink sql common parameters are written to the script file
logger.info("common parameters : {}", parameters);
Files.write(path, parameters.getBytes(), StandardOpenOption.APPEND);
// Flink init script is written to the script file
if (StringUtils.isNotEmpty(flinkParameters.getInitScript())) {
String script = flinkParameters.getInitScript().replaceAll("\\r\\n", "\n");
flinkParameters.setInitScript(script);
logger.info("init script : {}", flinkParameters.getInitScript());
Files.write(path, flinkParameters.getInitScript().getBytes(), StandardOpenOption.APPEND);
}
} catch (IOException e) {
throw new RuntimeException("generate flink sql script error", e);
}
}
return initScriptFileName;
}
private String generateScriptFile() {
String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
File file = new File(scriptFileName);
Path path = file.toPath();
if (!Files.exists(path)) {
String script = flinkParameters.getRawScript().replaceAll("\\r\\n", "\n");
flinkParameters.setRawScript(script);
logger.info("raw script : {}", flinkParameters.getRawScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
try {
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
}
// Flink sql raw script is written to the script file
Files.write(path, flinkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException("generate flink sql script error", e);
}
}
return scriptFileName;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = flinkParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);

5
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java

@ -22,9 +22,10 @@ package org.apache.dolphinscheduler.plugin.task.flink;
*/
public enum ProgramType {
/**
* 0 JAVA,1 SCALA,2 PYTHON
* 0 JAVA,1 SCALA,2 PYTHON,3 SQL
*/
JAVA,
SCALA,
PYTHON
PYTHON,
SQL
}

118
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JSONUtils.class
})
@PowerMockIgnore({"javax.*"})
public class FlinkTaskTest {
@Test
public void testBuildCommand() {
String parameters = buildFlinkParameters();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getQueue()).thenReturn("default");
FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
flinkTask.init();
Assert.assertEquals(
"flink run " +
"-m yarn-cluster " +
"-ys 1 " +
"-ynm TopSpeedWindowing " +
"-yjm 1G " +
"-ytm 1G " +
"-yqu default " +
"-p 2 -sae " +
"-c org.apache.flink.streaming.examples.windowing.TopSpeedWindowing " +
"TopSpeedWindowing.jar", flinkTask.buildCommand());
}
@Test
public void testBuildCommandWithFlinkSql() {
String parameters = buildFlinkParametersWithFlinkSql();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("4483");
FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
flinkTask.init();
Assert.assertEquals("sql-client.sh -i /tmp/4483_init.sql -f /tmp/4483_node.sql", flinkTask.buildCommand());
}
private String buildFlinkParameters() {
ResourceInfo resource = new ResourceInfo();
resource.setId(2);
resource.setResourceName("/TopSpeedWindowing.jar");
resource.setRes("TopSpeedWindowing.jar");
FlinkParameters parameters = new FlinkParameters();
parameters.setLocalParams(Collections.emptyList());
parameters.setResourceList(Collections.emptyList());
parameters.setProgramType(ProgramType.JAVA);
parameters.setMainClass("org.apache.flink.streaming.examples.windowing.TopSpeedWindowing");
parameters.setMainJar(resource);
parameters.setDeployMode("cluster");
parameters.setAppName("TopSpeedWindowing");
parameters.setFlinkVersion(">=1.10");
parameters.setJobManagerMemory("1G");
parameters.setTaskManagerMemory("1G");
parameters.setSlot(1);
parameters.setTaskManager(2);
parameters.setParallelism(2);
return JSONUtils.toJsonString(parameters);
}
private String buildFlinkParametersWithFlinkSql() {
FlinkParameters parameters = new FlinkParameters();
parameters.setLocalParams(Collections.emptyList());
parameters.setInitScript("set sql-client.execution.result-mode=tableau;");
parameters.setRawScript("selcet 11111;");
parameters.setProgramType(ProgramType.SQL);
parameters.setMainClass(StringUtils.EMPTY);
parameters.setDeployMode("cluster");
parameters.setAppName("FlinkSQL");
parameters.setOthers(StringUtils.EMPTY);
parameters.setJobManagerMemory("1G");
parameters.setTaskManagerMemory("1G");
parameters.setParallelism(1);
parameters.setFlinkVersion(">=1.10");
return JSONUtils.toJsonString(parameters);
}
}

2
dolphinscheduler-ui/src/locales/modules/en_US.ts

@ -669,6 +669,8 @@ const project = {
timeout_period_tips: 'Timeout must be a positive integer',
script: 'Script',
script_tips: 'Please enter script(required)',
init_script: 'Initialization script',
init_script_tips: 'Please enter initialization script',
resources: 'Resources',
resources_tips: 'Please select resources',
non_resources_tips: 'Please delete all non-existent resources',

2
dolphinscheduler-ui/src/locales/modules/zh_CN.ts

@ -661,6 +661,8 @@ const project = {
timeout_period_tips: '超时时长必须为正整数',
script: '脚本',
script_tips: '请输入脚本(必填)',
init_script: '初始化脚本',
init_script_tips: '请输入初始化脚本',
resources: '资源',
resources_tips: '请选择资源',
no_resources_tips: '请删除所有未授权或已删除资源',

55
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { computed, ref } from 'vue'
import { computed, ref, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
import type { IJsonItem } from '../types'
@ -22,7 +22,17 @@ import type { IJsonItem } from '../types'
export function useFlink(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const mainClassSpan = computed(() =>
model.programType === 'PYTHON' ? 0 : 24
model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
)
const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
const scriptSpan = computed(() => (model.programType === 'SQL' ? 24 : 0))
const flinkVersionOptions = computed(() =>
model.programType === 'SQL'
? [{ label: '>=1.13', value: '>=1.13' }]
: FLINK_VERSIONS
)
const taskManagerNumberSpan = computed(() =>
@ -35,6 +45,10 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0))
watchEffect(() => {
model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10'
})
return [
{
type: 'select',
@ -59,9 +73,13 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
},
validate: {
trigger: ['input', 'blur'],
required: model.programType !== 'PYTHON',
required: model.programType !== 'PYTHON' && model.programType !== 'SQL',
validator(validate: any, value: string) {
if (model.programType !== 'PYTHON' && !value) {
if (
model.programType !== 'PYTHON' &&
!value &&
model.programType !== 'SQL'
) {
return new Error(t('project.node.main_class_tips'))
}
}
@ -69,11 +87,33 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
},
useMainJar(model),
useDeployMode(24, ref(false)),
{
type: 'editor',
field: 'initScript',
span: scriptSpan,
name: t('project.node.init_script'),
validate: {
trigger: ['input', 'trigger'],
required: false,
message: t('project.node.init_script_tips')
}
},
{
type: 'editor',
field: 'rawScript',
span: scriptSpan,
name: t('project.node.script'),
validate: {
trigger: ['input', 'trigger'],
required: true,
message: t('project.node.script_tips')
}
},
{
type: 'select',
field: 'flinkVersion',
name: t('project.node.flink_version'),
options: FLINK_VERSIONS,
options: flinkVersionOptions,
value: model.flinkVersion,
span: deployModeSpan
},
@ -178,6 +218,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
{
type: 'input',
field: 'mainArgs',
span: mainArgsSpan,
name: t('project.node.main_arguments'),
props: {
type: 'textarea',
@ -214,6 +255,10 @@ const PROGRAM_TYPES = [
{
label: 'PYTHON',
value: 'PYTHON'
},
{
label: 'SQL',
value: 'SQL'
}
]

6
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts

@ -49,7 +49,9 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
watch(
() => model.programType,
(value) => {
getMainJars(value)
if (value !== 'SQL') {
getMainJars(value)
}
}
)
@ -70,7 +72,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
trigger: ['input', 'blur'],
required: model.programType !== 'SQL',
validator(validate: any, value: string) {
if (!value) {
if (!value && model.programType !== 'SQL') {
return new Error(t('project.node.main_package_tips'))
}
}

5
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -343,6 +343,7 @@ export function formatParams(data: INodeData): {
item.value = item.value || ''
return item
}),
initScript: data.initScript,
rawScript: data.rawScript,
resourceList: data.resourceList?.length
? data.resourceList.map((id: number) => ({ id }))
@ -468,6 +469,10 @@ export function formatModel(data: ITaskData) {
params.rawScript = data.taskParams?.rawScript
}
if (data.taskParams?.initScript) {
params.initScript = data.taskParams?.initScript
}
if (data.taskParams?.switchResult) {
params.switchResult = data.taskParams.switchResult
params.dependTaskList = data.taskParams.switchResult?.dependTaskList

2
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts

@ -45,6 +45,8 @@ export function useFlink({
timeout: 30,
programType: 'SCALA',
deployMode: 'cluster',
initScript: '',
rawScript: '',
flinkVersion: '<1.10',
jobManagerMemory: '1G',
taskManagerMemory: '2G',

1
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -212,6 +212,7 @@ interface ITaskParams {
mainJar?: ISourceItem
localParams?: ILocalParam[]
rawScript?: string
initScript?: string
programType?: string
sparkVersion?: string
flinkVersion?: string

Loading…
Cancel
Save