Browse Source

Merge branch 'dev' of https://github.com/apache/incubator-dolphinscheduler into dev-kill-yarn-job

pull/3/MERGE
Eights-LI 4 years ago
parent
commit
d3397b8ddb
  1. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  2. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  3. 207
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* spark args utils
*/
public class SparkArgsUtils {
private static final String SPARK_CLUSTER = "cluster";
private static final String SPARK_LOCAL = "local";
private static final String SPARK_ON_YARN = "yarn";
/**
* build args
*
@ -40,15 +45,15 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
String deployMode = SPARK_CLUSTER;
args.add(Constants.MASTER);
if (StringUtils.isNotEmpty(param.getDeployMode())) {
deployMode = param.getDeployMode();
}
if(!"local".equals(deployMode)){
args.add("yarn");
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -22,20 +23,20 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
/**
* spark task
*/
@ -59,46 +60,35 @@ public class SparkTask extends AbstractYarnTask {
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext sparkTaskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.sparkTaskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams());
sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class);
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
if (null == sparkParameters) {
logger.error("Spark params is null");
return;
}
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
setMainJarName();
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}
}
/**
* create command
*
* @return command
*/
@Override
@ -117,8 +107,18 @@ public class SparkTask extends AbstractYarnTask {
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
sparkTaskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
sparkTaskExecutionContext.getScheduleTime());
String command = null;
if (null != paramsMap) {
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
}
logger.info("spark task command: {}", command);
@ -129,7 +129,11 @@ public class SparkTask extends AbstractYarnTask {
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
if (mainJar != null) {
if (null == mainJar) {
throw new RuntimeException("Spark task jar params is null");
}
int resourceId = mainJar.getId();
String resourceName;
if (resourceId == 0) {
@ -144,7 +148,7 @@ public class SparkTask extends AbstractYarnTask {
}
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
}
@Override

207
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

@ -14,128 +14,121 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.PropertyPlaceholderHelper;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ParameterUtils.class, PlaceholderUtils.class, PropertyPlaceholderHelper.class})
public class SparkTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class);
/**
* spark1 command
*/
private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";
/**
* spark2 command
*/
private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
private TaskExecutionContext taskExecutionContext;
private ApplicationContext applicationContext;
private ProcessService processService;
private SparkTask spark2Task;
String spark1Params = "{"
+ "\"mainArgs\":\"\", "
+ "\"driverMemory\":\"1G\", "
+ "\"executorMemory\":\"2G\", "
+ "\"programType\":\"SCALA\", "
+ "\"mainClass\":\"basicetl.GlobalUserCar\", "
+ "\"driverCores\":\"2\", "
+ "\"deployMode\":\"cluster\", "
+ "\"executorCores\":2, "
+ "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, "
+ "\"sparkVersion\":\"SPARK1\", "
+ "\"numExecutors\":\"10\", "
+ "\"localParams\":[], "
+ "\"others\":\"\", "
+ "\"resourceList\":[]"
+ "}";
String spark2Params = "{"
+ "\"mainArgs\":\"\", "
+ "\"driverMemory\":\"1G\", "
+ "\"executorMemory\":\"2G\", "
+ "\"programType\":\"SCALA\", "
+ "\"mainClass\":\"basicetl.GlobalUserCar\", "
+ "\"driverCores\":\"2\", "
+ "\"deployMode\":\"cluster\", "
+ "\"executorCores\":2, "
+ "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, "
+ "\"sparkVersion\":\"SPARK2\", "
+ "\"numExecutors\":\"10\", "
+ "\"localParams\":[], "
+ "\"others\":\"\", "
+ "\"resourceList\":[]"
+ "}";
@Before
public void setTaskExecutionContext() {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskParams(spark2Params);
taskExecutionContext.setQueue("dev");
taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis()));
taskExecutionContext.setTenantCode("1");
taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(0);
processService = Mockito.mock(ProcessService.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
spark2Task = new SparkTask(taskExecutionContext, logger);
spark2Task.init();
}
@Test
public void testSparkTaskInit() {
TaskProps taskProps = new TaskProps();
String spark1Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK1\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";
String spark2Params = "{" +
"\"mainArgs\":\"\", " +
"\"driverMemory\":\"1G\", " +
"\"executorMemory\":\"2G\", " +
"\"programType\":\"SCALA\", " +
"\"mainClass\":\"basicetl.GlobalUserCar\", " +
"\"driverCores\":\"2\", " +
"\"deployMode\":\"cluster\", " +
"\"executorCores\":2, " +
"\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " +
"\"sparkVersion\":\"SPARK2\", " +
"\"numExecutors\":\"10\", " +
"\"localParams\":[], " +
"\"others\":\"\", " +
"\"resourceList\":[]" +
"}";
taskProps.setTaskParams(spark2Params);
logger.info("spark task params {}", taskProps.getTaskParams());
SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class);
assert sparkParameters != null;
if (!sparkParameters.checkParameters()) {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskProps.getQueue());
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
sparkParameters.setMainArgs(args);
}
List<String> args = new ArrayList<>();
//spark version
String sparkCommand = SPARK2_COMMAND;
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SPARK1_COMMAND;
}
args.add(sparkCommand);
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
String sparkArgs = String.join(" ", args);
logger.info("spark task command : {}", sparkArgs);
Assert.assertEquals(SPARK2_COMMAND, sparkArgs.split(" ")[0]);
TaskExecutionContext sparkTaskCtx = new TaskExecutionContext();
SparkTask sparkTask = new SparkTask(sparkTaskCtx, logger);
sparkTask.init();
sparkTask.getParameters();
Assert.assertNull(sparkTaskCtx.getTaskParams());
String spark2Command = spark2Task.buildCommand();
String spark2Expected = "${SPARK_HOME2}/bin/spark-submit --master yarn --deploy-mode cluster "
+ "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 "
+ "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar";
Assert.assertEquals(spark2Expected, spark2Command);
taskExecutionContext.setTaskParams(spark1Params);
SparkTask spark1Task = new SparkTask(taskExecutionContext, logger);
spark1Task.init();
String spark1Command = spark1Task.buildCommand();
String spark1Expected = "${SPARK_HOME1}/bin/spark-submit --master yarn --deploy-mode cluster "
+ "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 "
+ "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar";
Assert.assertEquals(spark1Expected, spark1Command);
}
}

Loading…
Cancel
Save