Browse Source

[Feature-4050][server] Spark task support udv inject (#4061)

* #4050 spark task support udv inject

* modify spark task UT

* modify sparkTaskExecutionCtx

* add exp for spark task get main jar method
pull/3/MERGE
Yelli 4 years ago committed by GitHub
parent
commit
1c5be9acf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  2. 192
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  3. 207
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* spark args utils
* spark args utils
*/
public class SparkArgsUtils {
private static final String SPARK_CLUSTER = "cluster";
private static final String SPARK_LOCAL = "local";
private static final String SPARK_ON_YARN = "yarn";
/**
* build args
*
@ -40,15 +45,15 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
String deployMode = SPARK_CLUSTER;
args.add(Constants.MASTER);
if(StringUtils.isNotEmpty(param.getDeployMode())){
if (StringUtils.isNotEmpty(param.getDeployMode())) {
deployMode = param.getDeployMode();
}
if(!"local".equals(deployMode)){
args.add("yarn");
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
@ -56,7 +61,7 @@ public class SparkArgsUtils {
ProgramType type = param.getProgramType();
String mainClass = param.getMainClass();
if(type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)){
if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}
@ -96,14 +101,14 @@ public class SparkArgsUtils {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(others)) {
if(!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)){
if (!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
args.add(others);
}else if (StringUtils.isNotEmpty(queue)) {
} else if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);

192
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -22,133 +23,136 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* spark task
*/
public class SparkTask extends AbstractYarnTask {
/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
/**
* spark parameters
*/
private SparkParameters sparkParameters;
/**
* taskExecutionContext
*/
private final TaskExecutionContext sparkTaskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.sparkTaskExecutionContext = taskExecutionContext;
}
/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
@Override
public void init() {
/**
* spark parameters
*/
private SparkParameters sparkParameters;
logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams());
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class);
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
if (null == sparkParameters) {
logger.error("Spark params is null");
return;
}
@Override
public void init() {
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
setMainJarName();
}
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
/**
* create command
*
* @return command
*/
@Override
protected String buildCommand() {
List<String> args = new ArrayList<>();
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
//spark version
String sparkCommand = SPARK2_COMMAND;
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}
setMainJarName();
args.add(sparkCommand);
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
sparkTaskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
sparkTaskExecutionContext.getScheduleTime());
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}
}
String command = null;
/**
* create command
* @return command
*/
@Override
protected String buildCommand() {
List<String> args = new ArrayList<>();
if (null != paramsMap) {
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
}
//spark version
String sparkCommand = SPARK2_COMMAND;
logger.info("spark task command: {}", command);
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
return command;
}
args.add(sparkCommand);
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
logger.info("spark task command : {}", command);
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (mainJar != null) {
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (null == mainJar) {
throw new RuntimeException("Spark task jar params is null");
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
if (resource == null) {
logger.error("resource id: {} not exist", resourceId);
throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
}
resourceName = resource.getFullName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;
}
}

207
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

@ -14,128 +14,121 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.PropertyPlaceholderHelper;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ParameterUtils.class, PlaceholderUtils.class, PropertyPlaceholderHelper.class})
public class SparkTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class);
/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
private TaskExecutionContext taskExecutionContext;
private ApplicationContext applicationContext;
private ProcessService processService;
private SparkTask spark2Task;
String spark1Params = "{"
+ "\"mainArgs\":\"\", "
+ "\"driverMemory\":\"1G\", "
+ "\"executorMemory\":\"2G\", "
+ "\"programType\":\"SCALA\", "
+ "\"mainClass\":\"basicetl.GlobalUserCar\", "
+ "\"driverCores\":\"2\", "
+ "\"deployMode\":\"cluster\", "
+ "\"executorCores\":2, "
+ "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, "
+ "\"sparkVersion\":\"SPARK1\", "
+ "\"numExecutors\":\"10\", "
+ "\"localParams\":[], "
+ "\"others\":\"\", "
+ "\"resourceList\":[]"
+ "}";
String spark2Params = "{"
+ "\"mainArgs\":\"\", "
+ "\"driverMemory\":\"1G\", "
+ "\"executorMemory\":\"2G\", "
+ "\"programType\":\"SCALA\", "
+ "\"mainClass\":\"basicetl.GlobalUserCar\", "
+ "\"driverCores\":\"2\", "
+ "\"deployMode\":\"cluster\", "
+ "\"executorCores\":2, "
+ "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, "
+ "\"sparkVersion\":\"SPARK2\", "
+ "\"numExecutors\":\"10\", "
+ "\"localParams\":[], "
+ "\"others\":\"\", "
+ "\"resourceList\":[]"
+ "}";
@Before
public void setTaskExecutionContext() {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskParams(spark2Params);
taskExecutionContext.setQueue("dev");
taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis()));
taskExecutionContext.setTenantCode("1");
taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(0);
processService = Mockito.mock(ProcessService.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
spark2Task = new SparkTask(taskExecutionContext, logger);
spark2Task.init();
}
@Test
public void testSparkTaskInit() {
TaskProps taskProps = new TaskProps();
String spark1Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK1\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";
String spark2Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK2\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";
taskProps.setTaskParams(spark2Params);
logger.info("spark task params {}", taskProps.getTaskParams());
SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class);
assert sparkParameters != null;
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}
List<String> args = new ArrayList<>();
//spark version
String sparkCommand = SPARK2_COMMAND;
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}
args.add(sparkCommand);
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
String sparkArgs = String.join(" ", args);
logger.info("spark task command : {}", sparkArgs);
Assert.assertEquals(SPARK2_COMMAND, sparkArgs.split(" ")[0]);
TaskExecutionContext sparkTaskCtx = new TaskExecutionContext();
SparkTask sparkTask = new SparkTask(sparkTaskCtx, logger);
sparkTask.init();
sparkTask.getParameters();
Assert.assertNull(sparkTaskCtx.getTaskParams());
String spark2Command = spark2Task.buildCommand();
String spark2Expected = "${SPARK_HOME2}/bin/spark-submit --master yarn --deploy-mode cluster "
+ "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 "
+ "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar";
Assert.assertEquals(spark2Expected, spark2Command);
taskExecutionContext.setTaskParams(spark1Params);
SparkTask spark1Task = new SparkTask(taskExecutionContext, logger);
spark1Task.init();
String spark1Command = spark1Task.buildCommand();
String spark1Expected = "${SPARK_HOME1}/bin/spark-submit --master yarn --deploy-mode cluster "
+ "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 "
+ "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar";
Assert.assertEquals(spark1Expected, spark1Command);
}
}

Loading…
Cancel
Save