diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java index 5cc7bd831a..0c68016db0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java @@ -14,24 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import java.util.ArrayList; import java.util.List; - /** - * spark args utils + * spark args utils */ public class SparkArgsUtils { + private static final String SPARK_CLUSTER = "cluster"; + + private static final String SPARK_LOCAL = "local"; + + private static final String SPARK_ON_YARN = "yarn"; + /** * build args * @@ -40,15 +45,15 @@ public class SparkArgsUtils { */ public static List buildArgs(SparkParameters param) { List args = new ArrayList<>(); - String deployMode = "cluster"; + String deployMode = SPARK_CLUSTER; args.add(Constants.MASTER); - if(StringUtils.isNotEmpty(param.getDeployMode())){ + if (StringUtils.isNotEmpty(param.getDeployMode())) { deployMode = param.getDeployMode(); } - if(!"local".equals(deployMode)){ - args.add("yarn"); + if (!SPARK_LOCAL.equals(deployMode)) { + args.add(SPARK_ON_YARN); args.add(Constants.DEPLOY_MODE); } @@ -56,7 +61,7 @@ public class SparkArgsUtils { ProgramType type = param.getProgramType(); String mainClass = param.getMainClass(); - if(type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)){ + if (type != null && type != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { args.add(Constants.MAIN_CLASS); args.add(mainClass); } @@ -96,14 +101,14 @@ public class SparkArgsUtils { String queue = param.getQueue(); if (StringUtils.isNotEmpty(others)) { - if(!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)){ + if (!others.contains(Constants.SPARK_QUEUE) && StringUtils.isNotEmpty(queue)) { args.add(Constants.SPARK_QUEUE); args.add(queue); } args.add(others); - }else if (StringUtils.isNotEmpty(queue)) { + } else if (StringUtils.isNotEmpty(queue)) { args.add(Constants.SPARK_QUEUE); args.add(queue); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index c540d27810..4e1a4d5356 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.spark; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -22,133 +23,136 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.slf4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.slf4j.Logger; + /** * spark task */ public class SparkTask extends AbstractYarnTask { - /** - * spark1 command - */ - private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + /** + * spark1 command + */ + private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + + /** + * spark2 command + */ + private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; + + /** + * spark parameters + */ + private SparkParameters sparkParameters; + + /** + * taskExecutionContext + */ + private final TaskExecutionContext sparkTaskExecutionContext; + + public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.sparkTaskExecutionContext = taskExecutionContext; + } - /** - * spark2 command - */ - private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; + @Override + public void init() { - /** - * spark parameters - */ - private SparkParameters sparkParameters; + logger.info("spark task params {}", sparkTaskExecutionContext.getTaskParams()); - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + sparkParameters = JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), SparkParameters.class); - public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - } + if (null == sparkParameters) { + logger.error("Spark params is null"); + return; + } - @Override - public void init() { + if (!sparkParameters.checkParameters()) { + throw new RuntimeException("spark task params is not valid"); + } + sparkParameters.setQueue(sparkTaskExecutionContext.getQueue()); + setMainJarName(); + } - logger.info("spark task params {}", taskExecutionContext.getTaskParams()); + /** + * create command + * + * @return command + */ + @Override + protected String buildCommand() { + List args = new ArrayList<>(); - sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class); + //spark version + String sparkCommand = SPARK2_COMMAND; - if (!sparkParameters.checkParameters()) { - throw new RuntimeException("spark task params is not valid"); - } - sparkParameters.setQueue(taskExecutionContext.getQueue()); + if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SPARK1_COMMAND; + } - setMainJarName(); + args.add(sparkCommand); - if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { - String args = sparkParameters.getMainArgs(); + // other parameters + args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - sparkParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // replace placeholder + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()), + sparkTaskExecutionContext.getDefinedParams(), + sparkParameters.getLocalParametersMap(), + CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()), + sparkTaskExecutionContext.getScheduleTime()); - if (paramsMap != null ){ - args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); - } - sparkParameters.setMainArgs(args); - } - } + String command = null; - /** - * create command - * @return command - */ - @Override - protected String buildCommand() { - List args = new ArrayList<>(); + if (null != paramsMap) { + command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); + } - //spark version - String sparkCommand = SPARK2_COMMAND; + logger.info("spark task command: {}", command); - if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { - sparkCommand = SPARK1_COMMAND; + return command; } - args.add(sparkCommand); - - // other parameters - args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); - - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - - logger.info("spark task command : {}", command); - - return command; - } - - @Override - protected void setMainJarName() { - // main jar - ResourceInfo mainJar = sparkParameters.getMainJar(); - if (mainJar != null) { - int resourceId = mainJar.getId(); - String resourceName; - if (resourceId == 0) { - resourceName = mainJar.getRes(); - } else { - Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId()); - if (resource == null) { - logger.error("resource id: {} not exist", resourceId); - throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + @Override + protected void setMainJarName() { + // main jar + ResourceInfo mainJar = sparkParameters.getMainJar(); + + if (null == mainJar) { + throw new RuntimeException("Spark task jar params is null"); } - resourceName = resource.getFullName().replaceFirst("/", ""); - } - mainJar.setRes(resourceName); - sparkParameters.setMainJar(mainJar); + + int resourceId = mainJar.getId(); + String resourceName; + if (resourceId == 0) { + resourceName = mainJar.getRes(); + } else { + Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId()); + if (resource == null) { + logger.error("resource id: {} not exist", resourceId); + throw new RuntimeException(String.format("resource id: %d not exist", resourceId)); + } + resourceName = resource.getFullName().replaceFirst("/", ""); + } + mainJar.setRes(resourceName); + sparkParameters.setMainJar(mainJar); + } - } - @Override - public AbstractParameters getParameters() { - return sparkParameters; - } + @Override + public AbstractParameters getParameters() { + return sparkParameters; + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java index 521650d3b6..2a3606dddf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java @@ -14,128 +14,121 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.spark; -import org.apache.dolphinscheduler.common.enums.SparkVersion; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.spark.SparkParameters; -import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils; +import org.apache.dolphinscheduler.common.utils.placeholder.PropertyPlaceholderHelper; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - +@RunWith(PowerMockRunner.class) +@PrepareForTest({ParameterUtils.class, PlaceholderUtils.class, PropertyPlaceholderHelper.class}) public class SparkTaskTest { private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class); - /** - * spark1 command - */ - private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; - - /** - * spark2 command - */ - private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; + private TaskExecutionContext taskExecutionContext; + + private ApplicationContext applicationContext; + + private ProcessService processService; + + private SparkTask spark2Task; + + String spark1Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK1\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + String spark2Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK2\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + @Before + public void setTaskExecutionContext() { + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskParams(spark2Params); + taskExecutionContext.setQueue("dev"); + taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis())); + taskExecutionContext.setTenantCode("1"); + taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(0); + + processService = Mockito.mock(ProcessService.class); + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + spark2Task = new SparkTask(taskExecutionContext, logger); + spark2Task.init(); + } @Test public void testSparkTaskInit() { - TaskProps taskProps = new TaskProps(); - - String spark1Params = "{" + - "\"mainArgs\":\"\", " + - "\"driverMemory\":\"1G\", " + - "\"executorMemory\":\"2G\", " + - "\"programType\":\"SCALA\", " + - "\"mainClass\":\"basicetl.GlobalUserCar\", " + - "\"driverCores\":\"2\", " + - "\"deployMode\":\"cluster\", " + - "\"executorCores\":2, " + - "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + - "\"sparkVersion\":\"SPARK1\", " + - "\"numExecutors\":\"10\", " + - "\"localParams\":[], " + - "\"others\":\"\", " + - "\"resourceList\":[]" + - "}"; - - String spark2Params = "{" + - "\"mainArgs\":\"\", " + - "\"driverMemory\":\"1G\", " + - "\"executorMemory\":\"2G\", " + - "\"programType\":\"SCALA\", " + - "\"mainClass\":\"basicetl.GlobalUserCar\", " + - "\"driverCores\":\"2\", " + - "\"deployMode\":\"cluster\", " + - "\"executorCores\":2, " + - "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + - "\"sparkVersion\":\"SPARK2\", " + - "\"numExecutors\":\"10\", " + - "\"localParams\":[], " + - "\"others\":\"\", " + - "\"resourceList\":[]" + - "}"; - - taskProps.setTaskParams(spark2Params); - - logger.info("spark task params {}", taskProps.getTaskParams()); - - SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class); - - assert sparkParameters != null; - if (!sparkParameters.checkParameters()) { - throw new RuntimeException("spark task params is not valid"); - } - sparkParameters.setQueue(taskProps.getQueue()); - - if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { - String args = sparkParameters.getMainArgs(); - - /** - * combining local and global parameters - */ - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - sparkParameters.getLocalParametersMap(), - taskProps.getCmdTypeIfComplement(), - taskProps.getScheduleTime()); - if (paramsMap != null) { - args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); - } - sparkParameters.setMainArgs(args); - } - - List args = new ArrayList<>(); - - //spark version - String sparkCommand = SPARK2_COMMAND; - - if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { - sparkCommand = SPARK1_COMMAND; - } - - args.add(sparkCommand); - - // other parameters - args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); - - String sparkArgs = String.join(" ", args); - - logger.info("spark task command : {}", sparkArgs); - - Assert.assertEquals(SPARK2_COMMAND, sparkArgs.split(" ")[0]); - + TaskExecutionContext sparkTaskCtx = new TaskExecutionContext(); + SparkTask sparkTask = new SparkTask(sparkTaskCtx, logger); + sparkTask.init(); + sparkTask.getParameters(); + Assert.assertNull(sparkTaskCtx.getTaskParams()); + + String spark2Command = spark2Task.buildCommand(); + String spark2Expected = "${SPARK_HOME2}/bin/spark-submit --master yarn --deploy-mode cluster " + + "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 " + + "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar"; + Assert.assertEquals(spark2Expected, spark2Command); + + taskExecutionContext.setTaskParams(spark1Params); + + SparkTask spark1Task = new SparkTask(taskExecutionContext, logger); + spark1Task.init(); + String spark1Command = spark1Task.buildCommand(); + String spark1Expected = "${SPARK_HOME1}/bin/spark-submit --master yarn --deploy-mode cluster " + + "--class basicetl.GlobalUserCar --driver-cores 2 --driver-memory 1G --num-executors 10 " + + "--executor-cores 2 --executor-memory 2G --queue dev test-1.0-SNAPSHOT.jar"; + Assert.assertEquals(spark1Expected, spark1Command); } }