diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java index 5064b1f394..f0b245215a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java @@ -22,9 +22,10 @@ package org.apache.dolphinscheduler.common.enums; */ public enum ProgramType { /** - * 0 JAVA,1 SCALA,2 PYTHON + * 0 JAVA,1 SCALA,2 PYTHON,3 SQL */ JAVA, SCALA, - PYTHON + PYTHON, + SQL } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java index ba5bcdd511..0092b31b77 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java @@ -24,9 +24,11 @@ public enum SparkVersion { /** * 0 SPARK1 * 1 SPARK2 + * 2 SPARKSQL */ SPARK1(0, "SPARK1"), - SPARK2(1, "SPARK2"); + SPARK2(1, "SPARK2"), + SPARKSQL(2, "SPARKSQL"); SparkVersion(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 860d5c13c7..aff6d253ab 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -37,8 +37,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskRequest, - logger); + taskRequest, + logger); } @Override @@ -73,7 +73,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { * create command * * @return String - * @throws Exception exception */ protected abstract String buildCommand(); @@ -94,8 +93,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { } return mainJar.getId() == 0 - ? mainJar.getRes() - // when update resource maybe has error - : mainJar.getResourceName().replaceFirst("/", ""); + ? mainJar.getRes() + // when update resource maybe has error + : mainJar.getResourceName().replaceFirst("/", ""); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java index 05b15118d0..e26a2ffd03 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java @@ -23,9 +23,10 @@ package org.apache.dolphinscheduler.plugin.task.spark; public enum ProgramType { /** - * 0 JAVA,1 SCALA,2 PYTHON + * 0 JAVA,1 SCALA,2 PYTHON,3 SQL */ JAVA, SCALA, - PYTHON + PYTHON, + SQL } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java deleted file mode 100644 index 1a5c662118..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.spark; - -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * spark args utils - */ -public class SparkArgsUtils { - - private static final String SPARK_CLUSTER = "cluster"; - - private static final String SPARK_LOCAL = "local"; - - private static final String SPARK_ON_YARN = "yarn"; - - private SparkArgsUtils() { - throw new IllegalStateException("Utility class"); - } - - /** - * build args - * - * @param param param - * @return argument list - */ - public static List buildArgs(SparkParameters param) { - List args = new ArrayList<>(); - args.add(SparkConstants.MASTER); - - String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER; - if (!SPARK_LOCAL.equals(deployMode)) { - args.add(SPARK_ON_YARN); - args.add(SparkConstants.DEPLOY_MODE); - } - args.add(deployMode); - - ProgramType programType = param.getProgramType(); - String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(SparkConstants.MAIN_CLASS); - args.add(mainClass); - } - - int driverCores = param.getDriverCores(); - if (driverCores > 0) { - args.add(SparkConstants.DRIVER_CORES); - args.add(String.format("%d", driverCores)); - } - - String driverMemory = param.getDriverMemory(); - if (StringUtils.isNotEmpty(driverMemory)) { - args.add(SparkConstants.DRIVER_MEMORY); - args.add(driverMemory); - } - - int numExecutors = param.getNumExecutors(); - if (numExecutors > 0) { - args.add(SparkConstants.NUM_EXECUTORS); - args.add(String.format("%d", numExecutors)); - } - - int executorCores = param.getExecutorCores(); - if (executorCores > 0) { - args.add(SparkConstants.EXECUTOR_CORES); - args.add(String.format("%d", executorCores)); - } - - String executorMemory = param.getExecutorMemory(); - if (StringUtils.isNotEmpty(executorMemory)) { - args.add(SparkConstants.EXECUTOR_MEMORY); - args.add(executorMemory); - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { - args.add(SparkConstants.SPARK_NAME); - args.add(ArgsUtils.escape(appName)); - } - - String others = param.getOthers(); - if (!SPARK_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - args.add(SparkConstants.SPARK_QUEUE); - args.add(queue); - } - } - - // --conf --files --jars --packages - if (StringUtils.isNotEmpty(others)) { - args.add(others); - } - - ResourceInfo mainJar = param.getMainJar(); - if (mainJar != null) { - args.add(mainJar.getRes()); - } - - String mainArgs = param.getMainArgs(); - if (StringUtils.isNotEmpty(mainArgs)) { - args.add(mainArgs); - } - - return args; - } - -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java index dc6335cce0..1dacddbf83 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java @@ -40,6 +40,8 @@ public class SparkConstants { public static final String DEPLOY_MODE = "--deploy-mode"; + public static final String DEPLOY_MODE_LOCAL = "local"; + /** * --driver-cores NUM */ @@ -55,6 +57,8 @@ public class SparkConstants { */ public static final String MASTER = "--master"; + public static final String SPARK_ON_YARN = "yarn"; + /** * --num-executors NUM */ @@ -70,4 +74,9 @@ public class SparkConstants { */ public static final String EXECUTOR_MEMORY = "--executor-memory"; + /** + * -f SQL from files + */ + public static final String SQL_FROM_FILE = "-f"; + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java index 44a3d0cc81..78aed34af1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java @@ -91,7 +91,7 @@ public class SparkParameters extends AbstractParameters { /** * program type - * 0 JAVA,1 SCALA,2 PYTHON + * 0 JAVA,1 SCALA,2 PYTHON,3 SQL */ private ProgramType programType; @@ -100,6 +100,11 @@ public class SparkParameters extends AbstractParameters { */ private String sparkVersion; + /** + * spark sql script + */ + private String rawScript; + /** * resource list */ @@ -225,9 +230,22 @@ public class SparkParameters extends AbstractParameters { this.sparkVersion = sparkVersion; } + public String getRawScript() { + return rawScript; + } + + public void setRawScript(String rawScript) { + this.rawScript = rawScript; + } + @Override public boolean checkParameters() { - return mainJar != null && programType != null; + /** + * When saving a task, the parameters cannot be empty and mainJar or rawScript cannot be empty: + * (1) When ProgramType is SQL, rawScript cannot be empty. + * (2) When ProgramType is Java/Scala/Python, mainJar cannot be empty. + */ + return programType != null && (mainJar != null || rawScript != null); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 54073e8fdd..abe3827858 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.plugin.task.spark; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; + import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; @@ -24,13 +26,25 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class SparkTask extends AbstractYarnTask { @@ -65,32 +79,45 @@ public class SparkTask extends AbstractYarnTask { throw new RuntimeException("spark task params is not valid"); } sparkParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); + + if (sparkParameters.getProgramType() != ProgramType.SQL) { + setMainJarName(); + } } /** * create command + * * @return command */ @Override protected String buildCommand() { - // spark-submit [options] [app arguments] + /** + * (1) spark-submit [options] [app arguments] + * (2) spark-sql [options] -f + */ List args = new ArrayList<>(); // spark version String sparkCommand = SparkVersion.SPARK2.getCommand(); + // If the programType is non-SQL, execute bin/spark-submit if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { sparkCommand = SparkVersion.SPARK1.getCommand(); } + // If the programType is SQL, execute bin/spark-sql + if (sparkParameters.getProgramType() == ProgramType.SQL) { + sparkCommand = SparkVersion.SPARKSQL.getCommand(); + } + args.add(sparkCommand); - // other parameters - args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); + // populate spark options + args.addAll(populateSparkOptions()); // replace placeholder, and combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); if (MapUtils.isEmpty(paramsMap)) { paramsMap = new HashMap<>(); } @@ -105,6 +132,134 @@ public class SparkTask extends AbstractYarnTask { return command; } + /** + * build spark options + * + * @return argument list + */ + private List populateSparkOptions() { + List args = new ArrayList<>(); + args.add(SparkConstants.MASTER); + + String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL; + if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) { + args.add(SparkConstants.SPARK_ON_YARN); + args.add(SparkConstants.DEPLOY_MODE); + } + args.add(deployMode); + + ProgramType programType = sparkParameters.getProgramType(); + String mainClass = sparkParameters.getMainClass(); + if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) { + args.add(SparkConstants.MAIN_CLASS); + args.add(mainClass); + } + + populateSparkResourceDefinitions(args); + + String appName = sparkParameters.getAppName(); + if (StringUtils.isNotEmpty(appName)) { + args.add(SparkConstants.SPARK_NAME); + args.add(ArgsUtils.escape(appName)); + } + + String others = sparkParameters.getOthers(); + if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { + String queue = sparkParameters.getQueue(); + if (StringUtils.isNotEmpty(queue)) { + args.add(SparkConstants.SPARK_QUEUE); + args.add(queue); + } + } + + // --conf --files --jars --packages + if (StringUtils.isNotEmpty(others)) { + args.add(others); + } + + ResourceInfo mainJar = sparkParameters.getMainJar(); + if (programType != ProgramType.SQL) { + args.add(mainJar.getRes()); + } + + String mainArgs = sparkParameters.getMainArgs(); + if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) { + args.add(mainArgs); + } + + // bin/spark-sql -f fileName + if (ProgramType.SQL == programType) { + args.add(SparkConstants.SQL_FROM_FILE); + args.add(generateScriptFile()); + } + return args; + } + + private void populateSparkResourceDefinitions(List args) { + int driverCores = sparkParameters.getDriverCores(); + if (driverCores > 0) { + args.add(SparkConstants.DRIVER_CORES); + args.add(String.format("%d", driverCores)); + } + + String driverMemory = sparkParameters.getDriverMemory(); + if (StringUtils.isNotEmpty(driverMemory)) { + args.add(SparkConstants.DRIVER_MEMORY); + args.add(driverMemory); + } + + int numExecutors = sparkParameters.getNumExecutors(); + if (numExecutors > 0) { + args.add(SparkConstants.NUM_EXECUTORS); + args.add(String.format("%d", numExecutors)); + } + + int executorCores = sparkParameters.getExecutorCores(); + if (executorCores > 0) { + args.add(SparkConstants.EXECUTOR_CORES); + args.add(String.format("%d", executorCores)); + } + + String executorMemory = sparkParameters.getExecutorMemory(); + if (StringUtils.isNotEmpty(executorMemory)) { + args.add(SparkConstants.EXECUTOR_MEMORY); + args.add(executorMemory); + } + } + + private String generateScriptFile() { + String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); + + File file = new File(scriptFileName); + Path path = file.toPath(); + + if (!Files.exists(path)) { + String script = sparkParameters.getRawScript().replaceAll("\\r\\n", "\n"); + sparkParameters.setRawScript(script); + + logger.info("raw script : {}", sparkParameters.getRawScript()); + logger.info("task execute path : {}", taskExecutionContext.getExecutePath()); + + Set perms = PosixFilePermissions.fromString(RWXR_XR_X); + FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); + try { + if (OSUtils.isWindows()) { + Files.createFile(path); + } else { + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + Files.createFile(path, attr); + } + Files.write(path, sparkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("generate spark sql script error", e); + } + + } + return scriptFileName; + } + @Override protected void setMainJarName() { // main jar diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java index 36398c7456..02c357914b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java @@ -22,10 +22,11 @@ public enum SparkVersion { /** * 0 SPARK1 * 1 SPARK2 + * 2 SPARKSQL */ SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"), SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"), - ; + SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql"); private final int code; private final String descp; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java new file mode 100644 index 0000000000..17c2ff0c4b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.spark; + +import java.util.Collections; + +import org.apache.commons.lang.StringUtils; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + JSONUtils.class +}) +@PowerMockIgnore({"javax.*"}) + +public class SparkTaskTest { + + @Test + public void testBuildCommandWithSparkSql() throws Exception { + String parameters = buildSparkParametersWithSparkSql(); + TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); + + SparkTask sparkTask = spy(new SparkTask(taskExecutionContext)); + sparkTask.init(); + Assert.assertEquals(sparkTask.buildCommand(), + "${SPARK_HOME2}/bin/spark-sql " + + "--master yarn " + + "--deploy-mode client " + + "--driver-cores 1 " + + "--driver-memory 512M " + + "--num-executors 2 " + + "--executor-cores 2 " + + "--executor-memory 1G " + + "--name sparksql " + + "-f /tmp/5536_node.sql"); + } + + private String buildSparkParametersWithSparkSql() { + SparkParameters sparkParameters = new SparkParameters(); + sparkParameters.setLocalParams(Collections.emptyList()); + sparkParameters.setRawScript("selcet 11111;"); + sparkParameters.setProgramType(ProgramType.SQL); + sparkParameters.setMainClass(StringUtils.EMPTY); + sparkParameters.setDeployMode("client"); + sparkParameters.setAppName("sparksql"); + sparkParameters.setOthers(StringUtils.EMPTY); + sparkParameters.setSparkVersion("SPARK2"); + sparkParameters.setDriverCores(1); + sparkParameters.setDriverMemory("512M"); + sparkParameters.setNumExecutors(2); + sparkParameters.setExecutorMemory("1G"); + sparkParameters.setExecutorCores(2); + return JSONUtils.toJsonString(sparkParameters); + } + +} diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts index 43a532d4e7..71edfa4943 100644 --- a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts +++ b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts @@ -66,7 +66,7 @@ interface OnlineCreateReq extends CreateReq, ContentReq { } interface ProgramTypeReq { - programType: 'JAVA' | 'SCALA' | 'PYTHON' + programType: 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL' } interface ListReq { diff --git a/dolphinscheduler-ui-next/src/store/project/types.ts b/dolphinscheduler-ui-next/src/store/project/types.ts index 99a0e67d17..147a84503e 100644 --- a/dolphinscheduler-ui-next/src/store/project/types.ts +++ b/dolphinscheduler-ui-next/src/store/project/types.ts @@ -18,7 +18,7 @@ import type { EditWorkflowDefinition } from '@/views/projects/workflow/components/dag/types' import type { IOption } from '@/components/form/types' -type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' +type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL' type DependentResultType = { [key: string]: 'SUCCESS' | 'WAITING_THREAD' | 'FAILURE' } diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts index 13f812132d..8f87d1cfb2 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts @@ -14,34 +14,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { useI18n } from 'vue-i18n' -import type { IJsonItem } from '../types' +import {ref, watchEffect} from 'vue' +import {useI18n} from 'vue-i18n' +import type {IJsonItem, IOption} from '../types' -export function useDeployMode(span = 24, showClient = true): IJsonItem { - const { t } = useI18n() +export function useDeployMode(span = 24, showClient = ref(true), showCluster = ref(true)): IJsonItem { + const {t} = useI18n() - return { - type: 'radio', - field: 'deployMode', - name: t('project.node.deploy_mode'), - options: DEPLOY_MODES.filter((option) => - option.value === 'client' ? showClient : true - ), - span - } + const deployModeOptions = ref(DEPLOY_MODES as IOption[]) + + watchEffect( + () => { + deployModeOptions.value = DEPLOY_MODES.filter((option) => { + switch (option.value) { + case 'cluster': + return showCluster.value + case 'client': + return showClient.value + default: + return true + } + }) + } + ) + return { + type: 'radio', + field: 'deployMode', + name: t('project.node.deploy_mode'), + options: deployModeOptions, + span + } } export const DEPLOY_MODES = [ - { - label: 'cluster', - value: 'cluster' - }, - { - label: 'client', - value: 'client' - }, - { - label: 'local', - value: 'local' - } + { + label: 'cluster', + value: 'cluster' + }, + { + label: 'client', + value: 'client' + }, + { + label: 'local', + value: 'local' + } ] diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts index d1e138211f..9b2170ec90 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { computed } from 'vue' +import { computed, ref } from 'vue' import { useI18n } from 'vue-i18n' import { useCustomParams, useDeployMode, useMainJar, useResources } from '.' import type { IJsonItem } from '../types' @@ -68,7 +68,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] { } }, useMainJar(model), - useDeployMode(24, false), + useDeployMode(24, ref(false)), { type: 'select', field: 'flinkVersion', diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts index 41356b07d6..e8008a6a3f 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -import { ref, onMounted, watch } from 'vue' +import { computed, ref, onMounted, watch } from 'vue' import { useI18n } from 'vue-i18n' import { queryResourceByProgramType } from '@/service/modules/resources' import { useTaskNodeStore } from '@/store/project/task-node' @@ -27,6 +26,9 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem { const mainJarOptions = ref([] as IMainJar[]) const taskStore = useTaskNodeStore() + const mainJarSpan = computed(() => + model.programType === 'SQL' ? 0 : 24 + ) const getMainJars = async (programType: ProgramType) => { const storeMainJar = taskStore.getMainJar(programType) if (storeMainJar) { @@ -57,6 +59,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem { type: 'tree-select', field: 'mainJar', name: t('project.node.main_package'), + span: mainJarSpan, props: { cascade: true, showPath: true, @@ -67,7 +70,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem { }, validate: { trigger: ['input', 'blur'], - required: true, + required: model.programType !== 'SQL', validator(validate: any, value: string) { if (!value) { return new Error(t('project.node.main_package_tips')) diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts index ea64355072..76b89b342d 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts @@ -16,7 +16,6 @@ */ import { computed } from 'vue' import { useI18n } from 'vue-i18n' -import { PROGRAM_TYPES } from './use-spark' import { useCustomParams, useMainJar, useResources } from '.' import type { IJsonItem } from '../types' @@ -24,7 +23,7 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const mainClassSpan = computed(() => - model.programType === 'PYTHON' ? 0 : 24 + (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24 ) return [ @@ -91,3 +90,18 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] { ...useCustomParams({ model, field: 'localParams', isSimple: true }) ] } + +export const PROGRAM_TYPES = [ + { + label: 'JAVA', + value: 'JAVA' + }, + { + label: 'SCALA', + value: 'SCALA' + }, + { + label: 'PYTHON', + value: 'PYTHON' + } +] diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts index fe33b1cd67..b28aa359ac 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { computed } from 'vue' +import { computed, ref } from 'vue' import { useI18n } from 'vue-i18n' import { useCustomParams, @@ -32,7 +32,19 @@ import type { IJsonItem } from '../types' export function useSpark(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const mainClassSpan = computed(() => - model.programType === 'PYTHON' ? 0 : 24 + (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24 + ) + + const mainArgsSpan = computed(() => + model.programType === 'SQL' ? 0 : 24 + ) + + const rawScriptSpan = computed(() => + model.programType === 'SQL' ? 24 : 0 + ) + + const showCluster = computed(() => + model.programType !== 'SQL' ) return [ @@ -66,16 +78,27 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { }, validate: { trigger: ['input', 'blur'], - required: model.programType !== 'PYTHON', + required: model.programType !== 'PYTHON' && model.programType !== 'SQL', validator(validate: any, value: string) { - if (model.programType !== 'PYTHON' && !value) { + if (model.programType !== 'PYTHON' && !value && model.programType !== 'SQL') { return new Error(t('project.node.main_class_tips')) } } } }, useMainJar(model), - useDeployMode(), + { + type: 'editor', + field: 'rawScript', + span: rawScriptSpan, + name: t('project.node.script'), + validate: { + trigger: ['input', 'trigger'], + required: true, + message: t('project.node.script_tips') + } + }, + useDeployMode(24, ref(true), showCluster), { type: 'input', field: 'appName', @@ -92,6 +115,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { { type: 'input', field: 'mainArgs', + span: mainArgsSpan, name: t('project.node.main_arguments'), props: { type: 'textarea', @@ -124,6 +148,10 @@ export const PROGRAM_TYPES = [ { label: 'PYTHON', value: 'PYTHON' + }, + { + label: 'SQL', + value: 'SQL' } ] diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts index 520a5481dc..46061ffc9c 100644 --- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts +++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts @@ -45,7 +45,8 @@ export function useSpark({ timeout: 30, programType: 'SCALA', sparkVersion: 'SPARK2', - deployMode: 'cluster', + rawScript: '', + deployMode: 'local', driverCores: 1, driverMemory: '512M', numExecutors: 2,