diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md index 821e16695b..44c9461803 100644 --- a/docs/docs/en/guide/task/flink.md +++ b/docs/docs/en/guide/task/flink.md @@ -30,7 +30,7 @@ Flink task type, used to execute Flink programs. For Flink nodes: | Program type | Support Java, Scala, Python and SQL four languages. | | Class of main function**: The **full path** of Main Class, the entry point of the Flink program. | | Main jar package | The jar package of the Flink program (upload by Resource Center). | -| Deployment mode | Support 2 deployment modes: cluster and local. | +| Deployment mode | Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)). | | Initialization script | Script file to initialize session context. | | Script | The sql script file developed by the user that should be executed. | | Flink version | Select version according to the execution environment. | @@ -45,6 +45,34 @@ Flink task type, used to execute Flink programs. For Flink nodes: | Resource | Appoint resource files in the `Resource` if parameters refer to them. | | Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. | | Predecessor task | Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task. | +- **Node name**: The node name in a workflow definition is unique. +- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select the `prohibition execution`. +- **Descriptive information**: Describe the function of the node. +- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order. +- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution. +- **Environment Name**: Configure the environment name in which run the script. +- **Times of failed retry attempts**: The number of times the task failed to resubmit. +- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task. +- **Delayed execution time**: The time (unit minute) that a task delays in execution. +- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail. +- **Program type**: Support Java, Scala, Python and SQL four languages. +- **The class of main function**: The **full path** of Main Class, the entry point of the Flink program. +- **Main jar package**: The jar package of the Flink program (upload by Resource Center). +- **Deployment mode**: Support 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)). +- **Initialization script**: Script file to initialize session context. +- **Script**: The sql script file developed by the user that should be executed. +- **Flink version**: Select version according to the execution env. +- **Task name** (optional): Flink task name. +- **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment. +- **Number of slots**: Used to set the number of slots, which can be set according to the actual production environment. +- **TaskManager memory size**: Used to set the size of taskManager memories, which can be set according to the actual production environment. +- **Number of TaskManager**: Used to set the number of taskManagers, which can be set according to the actual production environment. +- **Parallelism**: Used to set the degree of parallelism for executing Flink tasks. +- **Main program parameters**: Set the input parameters for the Flink program and support the substitution of custom parameter variables. +- **Optional parameters**: Support `--jar`, `--files`,` --archives`, `--conf` format. +- **Resource**: Appoint resource files in the `Resource` if parameters refer to them. +- **Custom parameter**: It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. +- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task. ## Task Example diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md index 212f6ee208..2cfb9b4b6d 100644 --- a/docs/docs/zh/guide/task/flink.md +++ b/docs/docs/zh/guide/task/flink.md @@ -28,7 +28,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点: - 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。 -- 部署方式:支持 cluster 和 local 两种模式的部署。 +- 部署方式:支持 cluster、 local 和 application (Flink 1.11和之后的版本支持,参见 [Run an application in Application Mode](https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode)) 三种模式的部署。 - 初始化脚本:用于初始化会话上下文的脚本文件。 - 脚本:用户开发的应该执行的 SQL 脚本文件。 - Flink 版本:根据所需环境选择对应的版本即可。 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java new file mode 100644 index 0000000000..33f0fecbfb --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FileUtils.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink; + +import org.apache.commons.lang3.SystemUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; + +public class FileUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class); + private FileUtils() {} + + public static String getInitScriptFilePath(TaskExecutionContext taskExecutionContext) { + return String.format("%s/%s_init.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); + } + + public static String getScriptFilePath(TaskExecutionContext taskExecutionContext) { + return String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); + } + + public static void generateScriptFile(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) { + String initScriptFilePath = FileUtils.getInitScriptFilePath(taskExecutionContext); + String scriptFilePath = FileUtils.getScriptFilePath(taskExecutionContext); + String initOptionsString = StringUtils.join( + FlinkArgsUtils.buildInitOptionsForSql(flinkParameters), + FlinkConstants.FLINK_SQL_NEWLINE + ).concat(FlinkConstants.FLINK_SQL_NEWLINE); + writeScriptFile(initScriptFilePath, initOptionsString + flinkParameters.getInitScript()); + writeScriptFile(scriptFilePath, flinkParameters.getRawScript()); + } + + private static void writeScriptFile(String scriptFileFullPath, String script) { + File scriptFile = new File(scriptFileFullPath); + Path path = scriptFile.toPath(); + if (Files.exists(path)) { + try { + Files.delete(path); + } catch (IOException e) { + throw new RuntimeException(String.format("Flink Script file exists in path: %s before creation and cannot be deleted", path), e); + } + } + + Set perms = PosixFilePermissions.fromString(RWXR_XR_X); + FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); + try { + if (SystemUtils.IS_OS_WINDOWS) { + Files.createFile(path); + } else { + if (!scriptFile.getParentFile().exists()) { + scriptFile.getParentFile().mkdirs(); + } + Files.createFile(path, attr); + } + + if (StringUtils.isNotEmpty(script)) { + String replacedScript = script.replaceAll("\\r\\n", "\n"); + FileUtils.writeStringToFile(scriptFile, replacedScript, StandardOpenOption.APPEND); + } + } catch (IOException e) { + throw new RuntimeException("Generate flink SQL script error", e); + } + } + + private static void writeStringToFile(File file, String content, StandardOpenOption standardOpenOption) { + try { + LOGGER.info("Writing content: " + content); + LOGGER.info("To file: " + file.getAbsolutePath()); + Files.write(file.getAbsoluteFile().toPath(), content.getBytes(StandardCharsets.UTF_8), standardOpenOption); + } catch(IOException e) { + throw new RuntimeException("Error writing file: " + file.getAbsoluteFile(), e); + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java new file mode 100644 index 0000000000..53f94ea0d1 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * flink args utils + */ +public class FlinkArgsUtils { + + private FlinkArgsUtils() { + throw new IllegalStateException("Utility class"); + } + + private static final String LOCAL_DEPLOY_MODE = "local"; + private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; + private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_12 = ">=1.12"; + private static final String FLINK_VERSION_AFTER_OR_EQUALS_1_13 = ">=1.13"; + /** + * default flink deploy mode + */ + public static final FlinkDeployMode DEFAULT_DEPLOY_MODE = FlinkDeployMode.CLUSTER; + + /** + * build flink command line + * + * @param param flink parameters + * @return argument list + */ + public static List buildCommandLine(TaskExecutionContext taskExecutionContext, FlinkParameters param) { + switch (param.getProgramType()) { + case SQL: + return buildCommandLineForSql(taskExecutionContext, param); + default: + return buildCommandLineForOthers(taskExecutionContext, param); + } + } + + /** + * build flink command line for SQL + * + * @return argument list + */ + private static List buildCommandLineForSql(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) { + List args = new ArrayList<>(); + + args.add(FlinkConstants.FLINK_SQL_COMMAND); + + // -i + String initScriptFilePath = FileUtils.getInitScriptFilePath(taskExecutionContext); + args.add(FlinkConstants.FLINK_SQL_INIT_FILE); + args.add(initScriptFilePath); + + // -f + String scriptFilePath = FileUtils.getScriptFilePath(taskExecutionContext); + args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE); + args.add(scriptFilePath); + + String others = flinkParameters.getOthers(); + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + return args; + } + + public static List buildInitOptionsForSql(FlinkParameters flinkParameters) { + List initOptions = new ArrayList<>(); + + FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(FlinkDeployMode.CLUSTER); + + /** + * Currently flink sql on yarn only supports yarn-per-job mode + */ + if (FlinkDeployMode.CLUSTER == deployMode) { + // execution.target + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, "local")); + } else { + // execution.target + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.FLINK_YARN_PER_JOB)); + + // taskmanager.numberOfTaskSlots + int slot = flinkParameters.getSlot(); + if (slot > 0) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS, slot)); + } + + // yarn.application.name + String appName = flinkParameters.getAppName(); + if (StringUtils.isNotEmpty(appName)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME, ArgsUtils.escape(appName))); + } + + // jobmanager.memory.process.size + String jobManagerMemory = flinkParameters.getJobManagerMemory(); + if (StringUtils.isNotEmpty(jobManagerMemory)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE, jobManagerMemory)); + } + + // taskmanager.memory.process.size + String taskManagerMemory = flinkParameters.getTaskManagerMemory(); + if (StringUtils.isNotEmpty(taskManagerMemory)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE, taskManagerMemory)); + } + + // yarn.application.queue + String others = flinkParameters.getOthers(); + if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); + } + } + } + + // parallelism.default + int parallelism = flinkParameters.getParallelism(); + if (parallelism > 0) { + initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, parallelism)); + } + + return initOptions; + } + + private static List buildCommandLineForOthers(TaskExecutionContext taskExecutionContext, FlinkParameters flinkParameters) { + List args = new ArrayList<>(); + + args.add(FlinkConstants.FLINK_COMMAND); + FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE); + String flinkVersion = flinkParameters.getFlinkVersion(); + // build run command + switch (deployMode) { + case CLUSTER: + if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion) || FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) { + args.add(FlinkConstants.FLINK_RUN); //run + args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t + args.add(FlinkConstants.FLINK_YARN_PER_JOB); //yarn-per-job + } else { + args.add(FlinkConstants.FLINK_RUN); //run + args.add(FlinkConstants.FLINK_RUN_MODE); //-m + args.add(FlinkConstants.FLINK_YARN_CLUSTER); //yarn-cluster + } + break; + case APPLICATION: + args.add(FlinkConstants.FLINK_RUN_APPLICATION); //run-application + args.add(FlinkConstants.FLINK_EXECUTION_TARGET); //-t + args.add(FlinkConstants.FLINK_YARN_APPLICATION); //yarn-application + break; + case LOCAL: + args.add(FlinkConstants.FLINK_RUN); //run + break; + } + + String others = flinkParameters.getOthers(); + + // build args + switch (deployMode) { + case CLUSTER: + case APPLICATION: + int slot = flinkParameters.getSlot(); + if (slot > 0) { + args.add(FlinkConstants.FLINK_YARN_SLOT); + args.add(String.format("%d", slot)); //-ys + } + + String appName = flinkParameters.getAppName(); + if (StringUtils.isNotEmpty(appName)) { //-ynm + args.add(FlinkConstants.FLINK_APP_NAME); + args.add(ArgsUtils.escape(appName)); + } + + // judge flink version, the parameter -yn has removed from flink 1.10 + if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { + int taskManager = flinkParameters.getTaskManager(); + if (taskManager > 0) { //-yn + args.add(FlinkConstants.FLINK_TASK_MANAGE); + args.add(String.format("%d", taskManager)); + } + } + String jobManagerMemory = flinkParameters.getJobManagerMemory(); + if (StringUtils.isNotEmpty(jobManagerMemory)) { + args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM); + args.add(jobManagerMemory); //-yjm + } + + String taskManagerMemory = flinkParameters.getTaskManagerMemory(); + if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm + args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM); + args.add(taskManagerMemory); + } + + if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { + String queue = flinkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { // -yqu + args.add(FlinkConstants.FLINK_QUEUE); + args.add(queue); + } + } + break; + case LOCAL: + break; + } + + int parallelism = flinkParameters.getParallelism(); + if (parallelism > 0) { + args.add(FlinkConstants.FLINK_PARALLELISM); + args.add(String.format("%d", parallelism)); // -p + } + + // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly + // The task status will be synchronized with the cluster job status + args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae + + // -s -yqu -yat -yD -D + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + + ProgramType programType = flinkParameters.getProgramType(); + String mainClass = flinkParameters.getMainClass(); + if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + args.add(FlinkConstants.FLINK_MAIN_CLASS); //-c + args.add(flinkParameters.getMainClass()); //main class + } + + ResourceInfo mainJar = flinkParameters.getMainJar(); + if (mainJar != null) { + // -py + if(ProgramType.PYTHON == programType) { + args.add(FlinkConstants.FLINK_PYTHON); + } + args.add(mainJar.getRes()); + } + + String mainArgs = flinkParameters.getMainArgs(); + if (StringUtils.isNotEmpty(mainArgs)) { + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); + args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap))); + } + + return args; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java index 42cb5ad78c..91e4efc125 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java @@ -39,8 +39,12 @@ public class FlinkConstants { /** * flink run options */ + public static final String FLINK_RUN_APPLICATION = "run-application"; public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; + public static final String FLINK_YARN_APPLICATION = "yarn-application"; + public static final String FLINK_YARN_PER_JOB = "yarn-per-job"; public static final String FLINK_RUN_MODE = "-m"; + public static final String FLINK_EXECUTION_TARGET = "-t"; public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_APP_NAME = "-ynm"; public static final String FLINK_QUEUE = "-yqu"; @@ -51,7 +55,7 @@ public class FlinkConstants { public static final String FLINK_PARALLELISM = "-p"; public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae"; public static final String FLINK_PYTHON = "-py"; - + // For Flink SQL public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s"; public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s"; public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set yarn.application.queue=%s"; @@ -62,12 +66,4 @@ public class FlinkConstants { public static final String FLINK_SQL_SCRIPT_FILE = "-f"; public static final String FLINK_SQL_INIT_FILE = "-i"; public static final String FLINK_SQL_NEWLINE = ";\n"; - - // execution.target options - public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job"; - public static final String EXECUTION_TARGET_LOACL = "local"; - - public static final String DEPLOY_MODE_CLUSTER = "cluster"; - public static final String DEPLOY_MODE_LOCAL = "local"; - public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java new file mode 100644 index 0000000000..b02cd40f92 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkDeployMode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.flink; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Flink deploy mode + */ +public enum FlinkDeployMode { + @JsonProperty("local") + LOCAL, + @JsonProperty("cluster") + CLUSTER, + @JsonProperty("application") + APPLICATION +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java index dca6fb58a3..8b45641989 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java @@ -39,9 +39,9 @@ public class FlinkParameters extends AbstractParameters { private String mainClass; /** - * deploy mode yarn-cluster yarn-local + * deploy mode yarn-cluster yarn-local yarn-application */ - private String deployMode; + private FlinkDeployMode deployMode; /** * arguments @@ -130,11 +130,11 @@ public class FlinkParameters extends AbstractParameters { this.mainClass = mainClass; } - public String getDeployMode() { + public FlinkDeployMode getDeployMode() { return deployMode; } - public void setDeployMode(String deployMode) { + public void setDeployMode(FlinkDeployMode deployMode) { this.deployMode = deployMode; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index f04282a144..5a78eed1bd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.flink; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; - import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; @@ -26,26 +24,14 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.lang3.SystemUtils; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; public class FlinkTask extends AbstractYarnTask { @@ -66,7 +52,6 @@ public class FlinkTask extends AbstractYarnTask { @Override public void init() { - logger.info("flink task params {}", taskExecutionContext.getTaskParams()); flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); @@ -75,10 +60,9 @@ public class FlinkTask extends AbstractYarnTask { throw new RuntimeException("flink task params is not valid"); } flinkParameters.setQueue(taskExecutionContext.getQueue()); + setMainJarName(); - if (ProgramType.SQL != flinkParameters.getProgramType()) { - setMainJarName(); - } + FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); } /** @@ -88,293 +72,16 @@ public class FlinkTask extends AbstractYarnTask { */ @Override protected String buildCommand() { - List args = new ArrayList<>(); + // flink run/run-application [OPTIONS] + List args = FlinkArgsUtils.buildCommandLine(taskExecutionContext, flinkParameters); + + String command = ParameterUtils + .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - if (ProgramType.SQL != flinkParameters.getProgramType()) { - // execute flink run [OPTIONS] - args.add(FlinkConstants.FLINK_COMMAND); - args.add(FlinkConstants.FLINK_RUN); - args.addAll(populateFlinkOptions()); - } else { - // execute sql-client.sh -f