From 9778652f171d71266c47468be5bf478ded9cad42 Mon Sep 17 00:00:00 2001 From: Rick Cheng Date: Wed, 19 Jul 2023 10:09:30 +0800 Subject: [PATCH] [Improvement-14136][task] Support submitting spark-sql task with the sql file in resource center (#14527) Co-authored-by: xiangzihao <460888207@qq.com> --- .../plugin/task/hivecli/HiveCliConstants.java | 2 - .../plugin/task/spark/SparkConstants.java | 4 + .../plugin/task/spark/SparkParameters.java | 133 +----------------- .../plugin/task/spark/SparkTask.java | 34 ++++- .../src/locales/en_US/project.ts | 3 + .../src/locales/zh_CN/project.ts | 3 + .../components/node/fields/use-hive-cli.ts | 30 ++-- .../task/components/node/fields/use-spark.ts | 49 ++++++- .../task/components/node/format-data.ts | 5 +- .../task/components/node/tasks/use-spark.ts | 3 +- .../projects/task/components/node/types.ts | 2 + 11 files changed, 111 insertions(+), 157 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliConstants.java index 8b92999241..848e4af427 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliConstants.java @@ -28,6 +28,4 @@ public class HiveCliConstants { public static final String HIVE_CLI_EXECUTE_FILE = "hive -f"; - public static final String HIVE_CLI_EXECUTE_SCRIPT = "hive -e \"%s\""; - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java index d4230c9d8c..c76436766e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java @@ -101,4 +101,8 @@ public class SparkConstants { */ public static final String SPARK_SUBMIT_COMMAND = "${SPARK_HOME}/bin/spark-submit"; + public static final String TYPE_SCRIPT = "SCRIPT"; + + public static final String TYPE_FILE = "FILE"; + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java index 0bb40e7324..c5fcb5b76b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java @@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import java.util.ArrayList; import java.util.List; -/** - * spark parameters - */ +import lombok.Data; + +@Data public class SparkParameters extends AbstractParameters { /** @@ -109,134 +109,9 @@ public class SparkParameters extends AbstractParameters { return namespace; } - public void setNamespace(String namespace) { - this.namespace = namespace; - } - - /** - * resource list - */ private List resourceList = new ArrayList<>(); - public ResourceInfo getMainJar() { - return mainJar; - } - - public void setMainJar(ResourceInfo mainJar) { - this.mainJar = mainJar; - } - - public String getMainClass() { - return mainClass; - } - - public void setMainClass(String mainClass) { - this.mainClass = mainClass; - } - - public String getDeployMode() { - return deployMode; - } - - public void setDeployMode(String deployMode) { - this.deployMode = deployMode; - } - - public String getMainArgs() { - return mainArgs; - } - - public void setMainArgs(String mainArgs) { - this.mainArgs = mainArgs; - } - - public int getDriverCores() { - return driverCores; - } - - public void setDriverCores(int driverCores) { - this.driverCores = driverCores; - } - - public String getDriverMemory() { - return driverMemory; - } - - public void setDriverMemory(String driverMemory) { - this.driverMemory = driverMemory; - } - - public int getNumExecutors() { - return numExecutors; - } - - public void setNumExecutors(int numExecutors) { - this.numExecutors = numExecutors; - } - - public int getExecutorCores() { - return executorCores; - } - - public void setExecutorCores(int executorCores) { - this.executorCores = executorCores; - } - - public String getExecutorMemory() { - return executorMemory; - } - - public void setExecutorMemory(String executorMemory) { - this.executorMemory = executorMemory; - } - - public String getAppName() { - return appName; - } - - public void setAppName(String appName) { - this.appName = appName; - } - - public String getYarnQueue() { - return yarnQueue; - } - - public void setYarnQueue(String yarnQueue) { - this.yarnQueue = yarnQueue; - } - - public String getOthers() { - return others; - } - - public void setOthers(String others) { - this.others = others; - } - - public List getResourceList() { - return resourceList; - } - - public void setResourceList(List resourceList) { - this.resourceList = resourceList; - } - - public ProgramType getProgramType() { - return programType; - } - - public void setProgramType(ProgramType programType) { - this.programType = programType; - } - - public String getRawScript() { - return rawScript; - } - - public void setRawScript(String rawScript) { - this.rawScript = rawScript; - } + private String sqlExecutionType; @Override public boolean checkParameters() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 0a27d9011e..675320d49f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -26,6 +26,7 @@ import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; @@ -33,11 +34,13 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -196,8 +199,28 @@ public class SparkTask extends AbstractYarnTask { // bin/spark-sql -f fileName if (ProgramType.SQL == programType) { + String sqlContent = ""; + String resourceFileName = ""; args.add(SparkConstants.SQL_FROM_FILE); - args.add(generateScriptFile()); + if (SparkConstants.TYPE_FILE.equals(sparkParameters.getSqlExecutionType())) { + final List resourceInfos = sparkParameters.getResourceList(); + if (resourceInfos.size() > 1) { + log.warn("more than 1 files detected, use the first one by default"); + } + + try { + resourceFileName = resourceInfos.get(0).getResourceName(); + sqlContent = FileUtils.readFileToString( + new File(String.format("%s/%s", taskExecutionContext.getExecutePath(), resourceFileName)), + StandardCharsets.UTF_8); + } catch (IOException e) { + log.error("read sql content from file {} error ", resourceFileName, e); + throw new TaskException("read sql content error", e); + } + } else { + sqlContent = sparkParameters.getRawScript(); + } + args.add(generateScriptFile(sqlContent)); } return args; } @@ -229,7 +252,7 @@ public class SparkTask extends AbstractYarnTask { } } - private String generateScriptFile() { + private String generateScriptFile(String sqlContent) { String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); @@ -237,10 +260,9 @@ public class SparkTask extends AbstractYarnTask { Path path = file.toPath(); if (!Files.exists(path)) { - String script = replaceParam(sparkParameters.getRawScript()); - sparkParameters.setRawScript(script); + String script = replaceParam(sqlContent); - log.info("raw script : {}", sparkParameters.getRawScript()); + log.info("raw script : {}", script); log.info("task execute path : {}", taskExecutionContext.getExecutePath()); Set perms = PosixFilePermissions.fromString(RWXR_XR_X); @@ -254,7 +276,7 @@ public class SparkTask extends AbstractYarnTask { } Files.createFile(path, attr); } - Files.write(path, sparkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); + Files.write(path, script.getBytes(), StandardOpenOption.APPEND); } catch (IOException e) { throw new RuntimeException("generate spark sql script error", e); } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 436a898f60..042d4c150a 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -718,6 +718,9 @@ export default { zeppelin_username_tips: 'Please enter the zeppelin server username', zeppelin_password: 'zeppelinPassword', zeppelin_password_tips: 'Please enter the zeppelin server password', + sql_execution_type: 'SQL Input', + sql_execution_type_from_file: 'FROM_FILE', + sql_execution_type_from_script: 'FROM_SCRIPT', hive_cli_task_execution_type: 'Hive Cli Task Execution Type', hive_sql_script: 'Hive SQL Script', hive_cli_options: 'Hive Cli Options', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 02699aa149..9d6d38690e 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -703,6 +703,9 @@ export default { zeppelin_username_tips: '请输入zeppelin server的登陆用户名', zeppelin_password: 'zeppelinPassword', zeppelin_password_tips: '请输入zeppelin server的登陆密码', + sql_execution_type: 'SQL来源', + sql_execution_type_from_file: '选择资源中心文件', + sql_execution_type_from_script: '脚本输入', hive_cli_task_execution_type: 'Hive Cli 任务类型', hive_sql_script: 'Hive SQL 脚本', hive_cli_options: 'Hive Cli 选项', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-hive-cli.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-hive-cli.ts index d67a3ffaa8..4a7ae3da14 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-hive-cli.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-hive-cli.ts @@ -25,18 +25,29 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] { model.hiveCliTaskExecutionType === 'SCRIPT' ? 24 : 0 ) const resourcesRequired = ref( - model.hiveCliTaskExecutionType === 'SCRIPT' ? false : true + model.hiveCliTaskExecutionType !== 'SCRIPT' ) const resourcesLimit = computed(() => model.hiveCliTaskExecutionType === 'SCRIPT' ? -1 : 1 ) + const SQL_EXECUTION_TYPES = [ + { + label: t('project.node.sql_execution_type_from_script'), + value: 'SCRIPT' + }, + { + label: t('project.node.sql_execution_type_from_file'), + value: 'FILE' + } + ] + watch( () => model.hiveCliTaskExecutionType, () => { resourcesRequired.value = - model.hiveCliTaskExecutionType === 'SCRIPT' ? false : true + model.hiveCliTaskExecutionType !== 'SCRIPT' } ) @@ -45,8 +56,8 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] { type: 'select', field: 'hiveCliTaskExecutionType', span: 12, - name: t('project.node.hive_cli_task_execution_type'), - options: HIVE_CLI_TASK_EXECUTION_TYPES, + name: t('project.node.sql_execution_type'), + options: SQL_EXECUTION_TYPES, validate: { trigger: ['input', 'blur'], required: true @@ -77,14 +88,3 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] { ...useCustomParams({ model, field: 'localParams', isSimple: false }) ] } - -export const HIVE_CLI_TASK_EXECUTION_TYPES = [ - { - label: 'FROM_SCRIPT', - value: 'SCRIPT' - }, - { - label: 'FROM_FILE', - value: 'FILE' - } -] diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts index 72e442f9fc..f43f68b218 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { computed, ref } from 'vue' +import { computed, ref, watch } from 'vue' import { useI18n } from 'vue-i18n' import { useCustomParams, @@ -39,10 +39,39 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24)) - const rawScriptSpan = computed(() => (model.programType === 'SQL' ? 24 : 0)) + const rawScriptSpan = computed(() => (model.programType === 'SQL' && model.sqlExecutionType === 'SCRIPT' ? 24 : 0)) const showCluster = computed(() => model.programType !== 'SQL') + const resourcesRequired = ref( + model.programType === 'SQL' && model.sqlExecutionType === 'FILE' + ) + + const resourcesLimit = computed(() => + model.programType === 'SQL' && model.sqlExecutionType === 'FILE' ? 1 : -1 + ) + + const sqlExecutionTypeSpan = computed(() => (model.programType === 'SQL' ? 12 : 0)) + + const SQL_EXECUTION_TYPES = [ + { + label: t('project.node.sql_execution_type_from_script'), + value: 'SCRIPT' + }, + { + label: t('project.node.sql_execution_type_from_file'), + value: 'FILE' + } + ] + + watch( + () => [model.sqlExecutionType, model.programType], + () => { + resourcesRequired.value = + model.programType === 'SQL' && model.sqlExecutionType === 'FILE' + } + ) + return [ { type: 'select', @@ -57,6 +86,17 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { } } }, + { + type: 'select', + field: 'sqlExecutionType', + span: sqlExecutionTypeSpan, + name: t('project.node.sql_execution_type'), + options: SQL_EXECUTION_TYPES, + validate: { + trigger: ['input', 'blur'], + required: true + } + }, { type: 'input', field: 'mainClass', @@ -85,6 +125,9 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { field: 'rawScript', span: rawScriptSpan, name: t('project.node.script'), + props: { + language: 'sql' + }, validate: { trigger: ['input', 'trigger'], required: true, @@ -126,7 +169,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { placeholder: t('project.node.option_parameters_tips') } }, - useResources(), + useResources(24, resourcesRequired, resourcesLimit), ...useCustomParams({ model, field: 'localParams', isSimple: false }) ] } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index dd3b4ce1d4..760a2139b9 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -71,6 +71,7 @@ export function formatParams(data: INodeData): { taskParams.numExecutors = data.numExecutors taskParams.executorMemory = data.executorMemory taskParams.executorCores = data.executorCores + taskParams.sqlExecutionType = data.sqlExecutionType } if (data.taskType === 'FLINK' || data.taskType === 'FLINK_STREAM') { @@ -325,7 +326,8 @@ export function formatParams(data: INodeData): { executorMemory: data.executorMemory, numExecutors: data.numExecutors, others: data.others, - yarnQueue: data.yarnQueue + yarnQueue: data.yarnQueue, + sqlExecutionType: data.sqlExecutionType } } @@ -726,6 +728,7 @@ export function formatModel(data: ITaskData) { params.executorMemory = data.taskParams.sparkParameters.executorMemory params.numExecutors = data.taskParams.sparkParameters.numExecutors params.others = data.taskParams.sparkParameters.others + params.sqlExecutionType = data.taskParams.sparkParameters.sqlExecutionType } if (data.taskParams?.conditionResult?.successNode?.length) { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts index 0fc42445c4..15f7ec508f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts @@ -52,7 +52,8 @@ export function useSpark({ executorMemory: '2G', executorCores: 2, yarnQueue: '', - timeoutNotifyStrategy: ['WARN'] + timeoutNotifyStrategy: ['WARN'], + sqlExecutionType: 'SCRIPT' } as INodeData) return { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 67d97a5b19..84b6375a01 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -224,6 +224,7 @@ interface ISparkParameters { numExecutors?: number others?: string yarnQueue?: string + sqlExecutionType?: string } interface IRuleParameters { @@ -348,6 +349,7 @@ interface ITaskParams { hiveCliOptions?: string hiveSqlScript?: string hiveCliTaskExecutionType?: string + sqlExecutionType?: string noteId?: string paragraphId?: string condaEnvName?: string