Browse Source
* [refactor] Dolphinscheduler sparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Dolphinscheduler SparkSQL * [refactor] Refactor ui code and add sparksql test cases * [refactor] refactor dolphinscheduler SparkSQL * [refactor] refactor dolphinscheduler plugin-sparkSQL * [refactor] refactor dolphinscheduler plugin-SparkSQL * [refactor] dolphinscheduler plugin-SparkTaskTest * [refactor] dolphinscheduler plugin-SparkTask * [refactor] dolphinscheduler plugin-Spark * [refactor] dolphinscheduler plugin-SparkTask-SparkSQL * [refactor] dolphinscheduler plugin-spark-SparkTask * [refactor] dolphinscheduler plugin-spark-SparkTask redefine code3.0.0/version-upgrade
sq-q
3 years ago
committed by
GitHub
18 changed files with 393 additions and 188 deletions
@ -1,129 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.spark; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; |
||||
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; |
||||
import org.apache.dolphinscheduler.spi.utils.StringUtils; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* spark args utils |
||||
*/ |
||||
public class SparkArgsUtils { |
||||
|
||||
private static final String SPARK_CLUSTER = "cluster"; |
||||
|
||||
private static final String SPARK_LOCAL = "local"; |
||||
|
||||
private static final String SPARK_ON_YARN = "yarn"; |
||||
|
||||
private SparkArgsUtils() { |
||||
throw new IllegalStateException("Utility class"); |
||||
} |
||||
|
||||
/** |
||||
* build args |
||||
* |
||||
* @param param param |
||||
* @return argument list |
||||
*/ |
||||
public static List<String> buildArgs(SparkParameters param) { |
||||
List<String> args = new ArrayList<>(); |
||||
args.add(SparkConstants.MASTER); |
||||
|
||||
String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER; |
||||
if (!SPARK_LOCAL.equals(deployMode)) { |
||||
args.add(SPARK_ON_YARN); |
||||
args.add(SparkConstants.DEPLOY_MODE); |
||||
} |
||||
args.add(deployMode); |
||||
|
||||
ProgramType programType = param.getProgramType(); |
||||
String mainClass = param.getMainClass(); |
||||
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { |
||||
args.add(SparkConstants.MAIN_CLASS); |
||||
args.add(mainClass); |
||||
} |
||||
|
||||
int driverCores = param.getDriverCores(); |
||||
if (driverCores > 0) { |
||||
args.add(SparkConstants.DRIVER_CORES); |
||||
args.add(String.format("%d", driverCores)); |
||||
} |
||||
|
||||
String driverMemory = param.getDriverMemory(); |
||||
if (StringUtils.isNotEmpty(driverMemory)) { |
||||
args.add(SparkConstants.DRIVER_MEMORY); |
||||
args.add(driverMemory); |
||||
} |
||||
|
||||
int numExecutors = param.getNumExecutors(); |
||||
if (numExecutors > 0) { |
||||
args.add(SparkConstants.NUM_EXECUTORS); |
||||
args.add(String.format("%d", numExecutors)); |
||||
} |
||||
|
||||
int executorCores = param.getExecutorCores(); |
||||
if (executorCores > 0) { |
||||
args.add(SparkConstants.EXECUTOR_CORES); |
||||
args.add(String.format("%d", executorCores)); |
||||
} |
||||
|
||||
String executorMemory = param.getExecutorMemory(); |
||||
if (StringUtils.isNotEmpty(executorMemory)) { |
||||
args.add(SparkConstants.EXECUTOR_MEMORY); |
||||
args.add(executorMemory); |
||||
} |
||||
|
||||
String appName = param.getAppName(); |
||||
if (StringUtils.isNotEmpty(appName)) { |
||||
args.add(SparkConstants.SPARK_NAME); |
||||
args.add(ArgsUtils.escape(appName)); |
||||
} |
||||
|
||||
String others = param.getOthers(); |
||||
if (!SPARK_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) { |
||||
String queue = param.getQueue(); |
||||
if (StringUtils.isNotEmpty(queue)) { |
||||
args.add(SparkConstants.SPARK_QUEUE); |
||||
args.add(queue); |
||||
} |
||||
} |
||||
|
||||
// --conf --files --jars --packages
|
||||
if (StringUtils.isNotEmpty(others)) { |
||||
args.add(others); |
||||
} |
||||
|
||||
ResourceInfo mainJar = param.getMainJar(); |
||||
if (mainJar != null) { |
||||
args.add(mainJar.getRes()); |
||||
} |
||||
|
||||
String mainArgs = param.getMainArgs(); |
||||
if (StringUtils.isNotEmpty(mainArgs)) { |
||||
args.add(mainArgs); |
||||
} |
||||
|
||||
return args; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,87 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.spark; |
||||
|
||||
import java.util.Collections; |
||||
|
||||
import org.apache.commons.lang.StringUtils; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.spi.utils.JSONUtils; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.powermock.api.mockito.PowerMockito; |
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore; |
||||
import org.powermock.core.classloader.annotations.PrepareForTest; |
||||
import org.powermock.modules.junit4.PowerMockRunner; |
||||
|
||||
import static org.powermock.api.mockito.PowerMockito.spy; |
||||
import static org.powermock.api.mockito.PowerMockito.when; |
||||
|
||||
@RunWith(PowerMockRunner.class) |
||||
@PrepareForTest({ |
||||
JSONUtils.class |
||||
}) |
||||
@PowerMockIgnore({"javax.*"}) |
||||
|
||||
public class SparkTaskTest { |
||||
|
||||
@Test |
||||
public void testBuildCommandWithSparkSql() throws Exception { |
||||
String parameters = buildSparkParametersWithSparkSql(); |
||||
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); |
||||
when(taskExecutionContext.getTaskParams()).thenReturn(parameters); |
||||
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); |
||||
when(taskExecutionContext.getTaskAppId()).thenReturn("5536"); |
||||
|
||||
SparkTask sparkTask = spy(new SparkTask(taskExecutionContext)); |
||||
sparkTask.init(); |
||||
Assert.assertEquals(sparkTask.buildCommand(), |
||||
"${SPARK_HOME2}/bin/spark-sql " + |
||||
"--master yarn " + |
||||
"--deploy-mode client " + |
||||
"--driver-cores 1 " + |
||||
"--driver-memory 512M " + |
||||
"--num-executors 2 " + |
||||
"--executor-cores 2 " + |
||||
"--executor-memory 1G " + |
||||
"--name sparksql " + |
||||
"-f /tmp/5536_node.sql"); |
||||
} |
||||
|
||||
private String buildSparkParametersWithSparkSql() { |
||||
SparkParameters sparkParameters = new SparkParameters(); |
||||
sparkParameters.setLocalParams(Collections.emptyList()); |
||||
sparkParameters.setRawScript("selcet 11111;"); |
||||
sparkParameters.setProgramType(ProgramType.SQL); |
||||
sparkParameters.setMainClass(StringUtils.EMPTY); |
||||
sparkParameters.setDeployMode("client"); |
||||
sparkParameters.setAppName("sparksql"); |
||||
sparkParameters.setOthers(StringUtils.EMPTY); |
||||
sparkParameters.setSparkVersion("SPARK2"); |
||||
sparkParameters.setDriverCores(1); |
||||
sparkParameters.setDriverMemory("512M"); |
||||
sparkParameters.setNumExecutors(2); |
||||
sparkParameters.setExecutorMemory("1G"); |
||||
sparkParameters.setExecutorCores(2); |
||||
return JSONUtils.toJsonString(sparkParameters); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue