Browse Source

[feature][task-flink] Support Flink application mode (#9577)

3.1.0-release
Paul Zhang 2 years ago committed by GitHub
parent
commit
4a3c3e7797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      docs/docs/en/guide/task/flink.md
  2. 2
      docs/docs/zh/guide/task/flink.md
  3. 103
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java
  4. 271
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  5. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
  6. 32
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java
  7. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
  8. 307
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  9. 132
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  10. 116
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
  11. 65
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

30
docs/docs/en/guide/task/flink.md

@ -30,7 +30,7 @@ Flink task type, used to execute Flink programs. For Flink nodes:
| Program type | Support Java, Scala, Python and SQL four languages. | | Program type | Support Java, Scala, Python and SQL four languages. |
| Class of main function**: The **full path** of Main Class, the entry point of the Flink program. | | Class of main function**: The **full path** of Main Class, the entry point of the Flink program. |
| Main jar package | The jar package of the Flink program (upload by Resource Center). | | Main jar package | The jar package of the Flink program (upload by Resource Center). |
| Deployment mode | Support 2 deployment modes: cluster and local. | | Deployment mode | Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)). |
| Initialization script | Script file to initialize session context. | | Initialization script | Script file to initialize session context. |
| Script | The sql script file developed by the user that should be executed. | | Script | The sql script file developed by the user that should be executed. |
| Flink version | Select version according to the execution environment. | | Flink version | Select version according to the execution environment. |
@ -45,6 +45,34 @@ Flink task type, used to execute Flink programs. For Flink nodes:
| Resource | Appoint resource files in the `Resource` if parameters refer to them. | | Resource | Appoint resource files in the `Resource` if parameters refer to them. |
| Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. | | Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. |
| Predecessor task | Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task. | | Predecessor task | Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task. |
- **Node name**: The node name in a workflow definition is unique.
- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select the `prohibition execution`.
- **Descriptive information**: Describe the function of the node.
- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order.
- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
- **Environment Name**: Configure the environment name in which run the script.
- **Times of failed retry attempts**: The number of times the task failed to resubmit.
- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
- **Delayed execution time**: The time (unit minute) that a task delays in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- **Program type**: Support Java, Scala, Python and SQL four languages.
- **The class of main function**: The **full path** of Main Class, the entry point of the Flink program.
- **Main jar package**: The jar package of the Flink program (upload by Resource Center).
- **Deployment mode**: Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)).
- **Initialization script**: Script file to initialize session context.
- **Script**: The sql script file developed by the user that should be executed.
- **Flink version**: Select version according to the execution env.
- **Task name** (optional): Flink task name.
- **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment.
- **Number of slots**: Used to set the number of slots, which can be set according to the actual production environment.
- **TaskManager memory size**: Used to set the size of taskManager memories, which can be set according to the actual production environment.
- **Number of TaskManager**: Used to set the number of taskManagers, which can be set according to the actual production environment.
- **Parallelism**: Used to set the degree of parallelism for executing Flink tasks.
- **Main program parameters**: Set the input parameters for the Flink program and support the substitution of custom parameter variables.
- **Optional parameters**: Support `--jar`, `--files`,` --archives`, `--conf` format.
- **Resource**: Appoint resource files in the `Resource` if parameters refer to them.
- **Custom parameter**: It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script.
- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task.
## Task Example ## Task Example

2
docs/docs/zh/guide/task/flink.md

@ -28,7 +28,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
- 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。 - 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。
- 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。
- 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
- 部署方式:支持 cluster 和 local 两种模式的部署。 - 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)) 三种模式的部署。
- 初始化脚本:用于初始化会话上下文的脚本文件。 - 初始化脚本:用于初始化会话上下文的脚本文件。
- 脚本:用户开发的应该执行的 SQL 脚本文件。 - 脚本:用户开发的应该执行的 SQL 脚本文件。
- Flink 版本:根据所需环境选择对应的版本即可。 - Flink 版本:根据所需环境选择对应的版本即可。

103
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
public class FileUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class);
private FileUtils() {}
public static String getInitScriptFilePath(TaskExecutionContext taskExecutionContext) {
return String.format("%s/%s_init.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
}
public static String getScriptFilePath(TaskExecutionContext taskExecutionContext) {
return String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
}
public static void generateScriptFile(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) {
String initScriptFilePath = FileUtils.getInitScriptFilePath(taskExecutionContext);
String scriptFilePath = FileUtils.getScriptFilePath(taskExecutionContext);
String initOptionsString = StringUtils.join(
FlinkArgsUtils.buildInitOptionsForSql(flinkParameters),
FlinkConstants.FLINK_SQL_NEWLINE
).concat(FlinkConstants.FLINK_SQL_NEWLINE);
writeScriptFile(initScriptFilePath, initOptionsString + flinkParameters.getInitScript());
writeScriptFile(scriptFilePath, flinkParameters.getRawScript());
}
private static void writeScriptFile(String scriptFileFullPath, String script) {
File scriptFile = new File(scriptFileFullPath);
Path path = scriptFile.toPath();
if (Files.exists(path)) {
try {
Files.delete(path);
} catch (IOException e) {
throw new RuntimeException(String.format("Flink Script file exists in path: %s before creation and cannot be deleted", path), e);
}
}
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
try {
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
if (!scriptFile.getParentFile().exists()) {
scriptFile.getParentFile().mkdirs();
}
Files.createFile(path, attr);
}
if (StringUtils.isNotEmpty(script)) {
String replacedScript = script.replaceAll("\\r\\n", "\n");
FileUtils.writeStringToFile(scriptFile, replacedScript, StandardOpenOption.APPEND);
}
} catch (IOException e) {
throw new RuntimeException("Generate flink SQL script error", e);
}
}
private static void writeStringToFile(File file, String content, StandardOpenOption standardOpenOption) {
try {
LOGGER.info("Writing content: " + content);
LOGGER.info("To file: " + file.getAbsolutePath());
Files.write(file.getAbsoluteFile().toPath(), content.getBytes(StandardCharsets.UTF_8), standardOpenOption);
} catch(IOException e) {
throw new RuntimeException("Error writing file: " + file.getAbsoluteFile(), e);
}
}
}

271
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* flink args utils
*/
public class FlinkArgsUtils {
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12";
private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_13 = ">=1.13";
/**
* default flink deploy mode
*/
public static final FlinkDeployMode DEFAULT_DEPLOY_MODE = FlinkDeployMode.CLUSTER;
/**
* build flink command line
*
* @param param flink parameters
* @return argument list
*/
public static List<String> buildCommandLine(TaskExecutionContext taskExecutionContext, FlinkParameters param) {
switch (param.getProgramType()) {
case SQL:
return buildCommandLineForSql(taskExecutionContext, param);
default:
return buildCommandLineForOthers(taskExecutionContext, param);
}
}
/**
* build flink command line for SQL
*
* @return argument list
*/
private static List<String> buildCommandLineForSql(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) {
List<String> args = new ArrayList<>();
args.add(FlinkConstants.FLINK_SQL_COMMAND);
// -i
String initScriptFilePath = FileUtils.getInitScriptFilePath(taskExecutionContext);
args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
args.add(initScriptFilePath);
// -f
String scriptFilePath = FileUtils.getScriptFilePath(taskExecutionContext);
args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
args.add(scriptFilePath);
String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
return args;
}
public static List<String> buildInitOptionsForSql(FlinkParameters flinkParameters) {
List<String> initOptions = new ArrayList<>();
FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(FlinkDeployMode.CLUSTER);
/**
* Currently flink sql on yarn only supports yarn-per-job mode
*/
if (FlinkDeployMode.CLUSTER == deployMode) {
// execution.target
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, "local"));
} else {
// execution.target
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_YARN_PER_JOB));
// taskmanager.numberOfTaskSlots
int slot = flinkParameters.getSlot();
if (slot > 0) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS, slot));
}
// yarn.application.name
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME, ArgsUtils.escape(appName)));
}
// jobmanager.memory.process.size
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE, jobManagerMemory));
}
// taskmanager.memory.process.size
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE, taskManagerMemory));
}
// yarn.application.queue
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
}
}
}
// parallelism.default
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, parallelism));
}
return initOptions;
}
private static List<String> buildCommandLineForOthers(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) {
List<String> args = new ArrayList<>();
args.add(FlinkConstants.FLINK_COMMAND);
FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
String flinkVersion = flinkParameters.getFlinkVersion();
// build run command
switch (deployMode) {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
args.add(FlinkConstants.FLINK_RUN); //run
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
args.add(FlinkConstants.FLINK_YARN_PER_JOB); //yarn-per-job
} else {
args.add(FlinkConstants.FLINK_RUN); //run
args.add(FlinkConstants.FLINK_RUN_MODE); //-m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster
}
break;
case APPLICATION:
args.add(FlinkConstants.FLINK_RUN_APPLICATION); //run-application
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t
args.add(FlinkConstants.FLINK_YARN_APPLICATION); //yarn-application
break;
case LOCAL:
args.add(FlinkConstants.FLINK_RUN); //run
break;
}
String others = flinkParameters.getOthers();
// build args
switch (deployMode) {
case CLUSTER:
case APPLICATION:
int slot = flinkParameters.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = flinkParameters.getTaskManager();
if (taskManager > 0) { //-yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
break;
case LOCAL:
break;
}
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c
args.add(flinkParameters.getMainClass()); //main class
}
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
// -py
if(ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
args.add(mainJar.getRes());
}
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
return args;
}
}

14
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java

@ -39,8 +39,12 @@ public class FlinkConstants {
/** /**
* flink run options * flink run options
*/ */
public static final String FLINK_RUN_APPLICATION = "run-application";
public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_YARN_APPLICATION = "yarn-application";
public static final String FLINK_YARN_PER_JOB = "yarn-per-job";
public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm"; public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE = "-yqu"; public static final String FLINK_QUEUE = "-yqu";
@ -51,7 +55,7 @@ public class FlinkConstants {
public static final String FLINK_PARALLELISM = "-p"; public static final String FLINK_PARALLELISM = "-p";
public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
public static final String FLINK_PYTHON = "-py"; public static final String FLINK_PYTHON = "-py";
// For Flink SQL
public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s"; public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s"; public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s";
public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set yarn.application.queue=%s"; public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set yarn.application.queue=%s";
@ -62,12 +66,4 @@ public class FlinkConstants {
public static final String FLINK_SQL_SCRIPT_FILE = "-f"; public static final String FLINK_SQL_SCRIPT_FILE = "-f";
public static final String FLINK_SQL_INIT_FILE = "-i"; public static final String FLINK_SQL_INIT_FILE = "-i";
public static final String FLINK_SQL_NEWLINE = ";\n"; public static final String FLINK_SQL_NEWLINE = ";\n";
// execution.target options
public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job";
public static final String EXECUTION_TARGET_LOACL = "local";
public static final String DEPLOY_MODE_CLUSTER = "cluster";
public static final String DEPLOY_MODE_LOCAL = "local";
public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
} }

32
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Flink deploy mode
*/
public enum FlinkDeployMode {
@JsonProperty("local")
LOCAL,
@JsonProperty("cluster")
CLUSTER,
@JsonProperty("application")
APPLICATION
}

8
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java

@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters {
private String mainClass; private String mainClass;
/** /**
* deploy mode yarn-cluster yarn-local * deploy mode yarn-cluster yarn-local yarn-application
*/ */
private String deployMode; private FlinkDeployMode deployMode;
/** /**
* arguments * arguments
@ -130,11 +130,11 @@ public class FlinkParameters extends AbstractParameters {
this.mainClass = mainClass; this.mainClass = mainClass;
} }
public String getDeployMode() { public FlinkDeployMode getDeployMode() {
return deployMode; return deployMode;
} }
public void setDeployMode(String deployMode) { public void setDeployMode(FlinkDeployMode deployMode) {
this.deployMode = deployMode; this.deployMode = deployMode;
} }

307
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.flink; package org.apache.dolphinscheduler.plugin.task.flink;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@ -26,26 +24,14 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class FlinkTask extends AbstractYarnTask { public class FlinkTask extends AbstractYarnTask {
@ -66,7 +52,6 @@ public class FlinkTask extends AbstractYarnTask {
@Override @Override
public void init() { public void init() {
logger.info("flink task params {}", taskExecutionContext.getTaskParams()); logger.info("flink task params {}", taskExecutionContext.getTaskParams());
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
@ -75,10 +60,9 @@ public class FlinkTask extends AbstractYarnTask {
throw new RuntimeException("flink task params is not valid"); throw new RuntimeException("flink task params is not valid");
} }
flinkParameters.setQueue(taskExecutionContext.getQueue()); flinkParameters.setQueue(taskExecutionContext.getQueue());
if (ProgramType.SQL != flinkParameters.getProgramType()) {
setMainJarName(); setMainJarName();
}
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
} }
/** /**
@ -88,293 +72,16 @@ public class FlinkTask extends AbstractYarnTask {
*/ */
@Override @Override
protected String buildCommand() { protected String buildCommand() {
List<String> args = new ArrayList<>(); // flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildCommandLine(taskExecutionContext, flinkParameters);
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
if (ProgramType.SQL != flinkParameters.getProgramType()) {
// execute flink run [OPTIONS] <jar-file> <arguments>
args.add(FlinkConstants.FLINK_COMMAND);
args.add(FlinkConstants.FLINK_RUN);
args.addAll(populateFlinkOptions());
} else {
// execute sql-client.sh -f <script file>
args.add(FlinkConstants.FLINK_SQL_COMMAND);
args.addAll(populateFlinkSqlOptions());
}
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("flink task command : {}", command); logger.info("flink task command : {}", command);
return command; return command;
} }
/**
* build flink options
*
* @return argument list
*/
private List<String> populateFlinkOptions() {
List<String> args = new ArrayList<>();
String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
populateFlinkOnYarnOptions(args);
}
// -p
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism));
}
/**
* -sae
*
* If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly.
* The task status will be synchronized with the cluster job status.
*/
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT);
// -s -yqu -yat -yD -D
String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
// -c
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS);
args.add(flinkParameters.getMainClass());
}
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
// -py
if(ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
args.add(mainJar.getRes());
}
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
// combining local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
return args;
}
private void populateFlinkOnYarnOptions(List<String> args) {
// -m yarn-cluster
args.add(FlinkConstants.FLINK_RUN_MODE);
args.add(FlinkConstants.FLINK_YARN_CLUSTER);
// -ys
int slot = flinkParameters.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot));
}
// -ynm
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
/**
* -yn
*
* Note: judge flink version, the parameter -yn has removed from flink 1.10
*/
String flinkVersion = flinkParameters.getFlinkVersion();
if (flinkVersion == null || FlinkConstants.FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = flinkParameters.getTaskManager();
if (taskManager > 0) {
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
// -yjm
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory);
}
// -ytm
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) {
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
// -yqu
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(FlinkConstants.FLINK_QUEUE);
args.add(queue);
}
}
}
/**
* build flink sql options
*
* @return argument list
*/
private List<String> populateFlinkSqlOptions() {
List<String> args = new ArrayList<>();
List<String> defalutOptions = new ArrayList<>();
String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
/**
* Currently flink sql on yarn only supports yarn-per-job mode
*/
if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
populateFlinkSqlOnYarnOptions(defalutOptions);
} else {
// execution.target
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_LOACL));
}
// parallelism.default
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, parallelism));
}
// -i
args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
args.add(generateInitScriptFile(StringUtils.join(defalutOptions, FlinkConstants.FLINK_SQL_NEWLINE).concat(FlinkConstants.FLINK_SQL_NEWLINE)));
// -f
args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
args.add(generateScriptFile());
String others = flinkParameters.getOthers();
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
return args;
}
private void populateFlinkSqlOnYarnOptions(List<String> defalutOptions) {
// execution.target
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_YARN_PER_JOB));
// taskmanager.numberOfTaskSlots
int slot = flinkParameters.getSlot();
if (slot > 0) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS, slot));
}
// yarn.application.name
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME, ArgsUtils.escape(appName)));
}
// jobmanager.memory.process.size
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE, jobManagerMemory));
}
// taskmanager.memory.process.size
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE, taskManagerMemory));
}
// yarn.application.queue
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
}
}
}
private String generateInitScriptFile(String parameters) {
String initScriptFileName = String.format("%s/%s_init.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
File file = new File(initScriptFileName);
Path path = file.toPath();
if (!Files.exists(path)) {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
try {
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
}
// Flink sql common parameters are written to the script file
logger.info("common parameters : {}", parameters);
Files.write(path, parameters.getBytes(), StandardOpenOption.APPEND);
// Flink init script is written to the script file
if (StringUtils.isNotEmpty(flinkParameters.getInitScript())) {
String script = flinkParameters.getInitScript().replaceAll("\\r\\n", "\n");
flinkParameters.setInitScript(script);
logger.info("init script : {}", flinkParameters.getInitScript());
Files.write(path, flinkParameters.getInitScript().getBytes(), StandardOpenOption.APPEND);
}
} catch (IOException e) {
throw new RuntimeException("generate flink sql script error", e);
}
}
return initScriptFileName;
}
private String generateScriptFile() {
String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
File file = new File(scriptFileName);
Path path = file.toPath();
if (!Files.exists(path)) {
String script = flinkParameters.getRawScript().replaceAll("\\r\\n", "\n");
flinkParameters.setRawScript(script);
logger.info("raw script : {}", flinkParameters.getRawScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
try {
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
}
// Flink sql raw script is written to the script file
Files.write(path, flinkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException("generate flink sql script error", e);
}
}
return scriptFileName;
}
@Override @Override
protected void setMainJarName() { protected void setMainJarName() {
ResourceInfo mainJar = flinkParameters.getMainJar(); ResourceInfo mainJar = flinkParameters.getMainJar();

132
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class FlinkArgsUtilsTest {
private String joinStringListWithSpace(List<String> stringList) {
return String.join(" ", stringList);
}
private FlinkParameters buildTestFlinkParametersWithDeployMode(FlinkDeployMode flinkDeployMode) {
FlinkParameters flinkParameters = new FlinkParameters();
flinkParameters.setProgramType(ProgramType.SCALA);
flinkParameters.setDeployMode(flinkDeployMode);
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setResourceName("job");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");
flinkParameters.setSlot(4);
flinkParameters.setAppName("demo-app-name");
flinkParameters.setJobManagerMemory("1024m");
flinkParameters.setTaskManagerMemory("1024m");
return flinkParameters;
}
private TaskExecutionContext buildTestTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
return taskExecutionContext;
}
@Test
public void testRunJarInApplicationMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
List<String> commandLine = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
"flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@Test
public void testRunJarInClusterMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setFlinkVersion("1.11");
List<String> commandLine1 = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
List<String> commandLine2 = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
"flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
List<String> commandLine3 = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
"flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@Test
public void testRunJarInLocalMode() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
List<String> commandLine = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals(
"flink run -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine));
}
@Test
public void testRunSql() throws Exception {
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine = FlinkArgsUtils.buildCommandLine(buildTestTaskExecutionContext(), flinkParameters);
Assert.assertEquals("sql-client.sh -i /tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}
@Test
public void testInitOptionsInClusterMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER));
Assert.assertEquals(2, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=local"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
}
@Test
public void testInitOptionsInApplicationMode() throws Exception {
List<String> initOptions = FlinkArgsUtils.buildInitOptionsForSql(buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION));
Assert.assertEquals(6, initOptions.size());
Assert.assertTrue(initOptions.contains("set execution.target=yarn-per-job"));
Assert.assertTrue(initOptions.contains("set taskmanager.numberOfTaskSlots=4"));
Assert.assertTrue(initOptions.contains("set yarn.application.name=demo-app-name"));
Assert.assertTrue(initOptions.contains("set jobmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set taskmanager.memory.process.size=1024m"));
Assert.assertTrue(initOptions.contains("set parallelism.default=4"));
}
}

116
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JSONUtils.class
})
@PowerMockIgnore({"javax.*"})
public class FlinkTaskTest {
@Test
public void testBuildCommand() {
String parameters = buildFlinkParameters();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getQueue()).thenReturn("default");
FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
flinkTask.init();
Assert.assertEquals(
"flink run " +
"-m yarn-cluster " +
"-ys 1 " +
"-ynm TopSpeedWindowing " +
"-yjm 1G " +
"-ytm 1G " +
"-yqu default " +
"-p 2 -sae " +
"-c org.apache.flink.streaming.examples.windowing.TopSpeedWindowing " +
"TopSpeedWindowing.jar", flinkTask.buildCommand());
}
@Test
public void testBuildCommandWithFlinkSql() {
String parameters = buildFlinkParametersWithFlinkSql();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
when(taskExecutionContext.getTaskAppId()).thenReturn("4483");
FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
flinkTask.init();
Assert.assertEquals("sql-client.sh -i /tmp/4483_init.sql -f /tmp/4483_node.sql", flinkTask.buildCommand());
}
private String buildFlinkParameters() {
ResourceInfo resource = new ResourceInfo();
resource.setId(2);
resource.setResourceName("/TopSpeedWindowing.jar");
resource.setRes("TopSpeedWindowing.jar");
FlinkParameters parameters = new FlinkParameters();
parameters.setLocalParams(Collections.emptyList());
parameters.setResourceList(Collections.emptyList());
parameters.setProgramType(ProgramType.JAVA);
parameters.setMainClass("org.apache.flink.streaming.examples.windowing.TopSpeedWindowing");
parameters.setMainJar(resource);
parameters.setDeployMode("cluster");
parameters.setAppName("TopSpeedWindowing");
parameters.setFlinkVersion(">=1.10");
parameters.setJobManagerMemory("1G");
parameters.setTaskManagerMemory("1G");
parameters.setSlot(1);
parameters.setTaskManager(2);
parameters.setParallelism(2);
return JSONUtils.toJsonString(parameters);
}
private String buildFlinkParametersWithFlinkSql() {
FlinkParameters parameters = new FlinkParameters();
parameters.setLocalParams(Collections.emptyList());
parameters.setInitScript("set sql-client.execution.result-mode=tableau;");
parameters.setRawScript("selcet 11111;");
parameters.setProgramType(ProgramType.SQL);
parameters.setMainClass("");
parameters.setDeployMode("cluster");
parameters.setAppName("FlinkSQL");
parameters.setOthers("");
parameters.setJobManagerMemory("1G");
parameters.setTaskManagerMemory("1G");
parameters.setParallelism(1);
parameters.setFlinkVersion(">=1.10");
return JSONUtils.toJsonString(parameters);
}
}

65
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
import { computed, ref, watchEffect } from 'vue' import { computed, watch, watchEffect } from 'vue'
import { useI18n } from 'vue-i18n' import { useI18n } from 'vue-i18n'
import { useCustomParams, useDeployMode, useMainJar, useResources } from '.' import { useCustomParams, useMainJar, useResources } from '.'
import type { IJsonItem } from '../types' import type { IJsonItem } from '../types'
export function useFlink(model: { [field: string]: any }): IJsonItem[] { export function useFlink(model: { [field: string]: any }): IJsonItem[] {
@ -36,14 +36,53 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
) )
const taskManagerNumberSpan = computed(() => const taskManagerNumberSpan = computed(() =>
model.flinkVersion === '<1.10' && model.deployMode === 'cluster' ? 12 : 0 model.flinkVersion === '<1.10' && model.deployMode !== 'local' ? 12 : 0
) )
const deployModeSpan = computed(() => const deployModeSpan = computed(() =>
model.deployMode === 'cluster' ? 12 : 0 model.deployMode !== 'local' ? 12 : 0
) )
const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0)) const appNameSpan = computed(() => (model.deployMode !== 'local' ? 24 : 0))
const deployModeOptions = computed(() => {
if (model.flinkVersion === '<1.10') {
return [
{
label: 'cluster',
value: 'cluster'
},
{
label: 'local',
value: 'local'
}
];
} else {
return [
{
label: 'per-job/cluster',
value: 'cluster'
},
{
label: 'application',
value: 'application'
},
{
label: 'local',
value: 'local'
}
];
}
})
watch(
() => model.flinkVersion,
() => {
if (model.flinkVersion === '<1.10' && model.deployMode === 'application') {
model.deployMode = 'cluster'
}
}
)
watchEffect(() => { watchEffect(() => {
model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10' model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10'
@ -86,7 +125,13 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
} }
}, },
useMainJar(model), useMainJar(model),
useDeployMode(24, ref(false)), {
type: 'radio',
field: 'deployMode',
name: t('project.node.deploy_mode'),
options: deployModeOptions,
span: 24
},
{ {
type: 'editor', type: 'editor',
field: 'initScript', field: 'initScript',
@ -269,7 +314,11 @@ const FLINK_VERSIONS = [
value: '<1.10' value: '<1.10'
}, },
{ {
label: '>=1.10', label: '1.11',
value: '>=1.10' value: '1.11'
},
{
label: '>=1.12',
value: '>=1.12'
} }
] ]

Loading…
Cancel
Save