diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java index 91e8716975..6918a548ac 100644 --- a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java +++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.data.quality; import static org.apache.dolphinscheduler.data.quality.Constants.SPARK_APP_NAME; +import static org.apache.dolphinscheduler.data.quality.enums.ReaderType.HIVE; import org.apache.dolphinscheduler.data.quality.config.Config; import org.apache.dolphinscheduler.data.quality.config.DataQualityConfiguration; @@ -64,9 +65,16 @@ public class DataQualityApplication { config.put(SPARK_APP_NAME, dataQualityConfiguration.getName()); } - SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config); + boolean hiveClientSupport = dataQualityConfiguration + .getReaderConfigs() + .stream() + .anyMatch(line -> line.getType().equalsIgnoreCase(HIVE.name())); + + SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config, hiveClientSupport); + DataQualityContext dataQualityContext = new DataQualityContext(sparkRuntimeEnvironment, dataQualityConfiguration); + dataQualityContext.execute(); } } diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/execution/SparkRuntimeEnvironment.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/execution/SparkRuntimeEnvironment.java index 2e46a3c44b..34a9906e14 100644 --- a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/execution/SparkRuntimeEnvironment.java +++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/execution/SparkRuntimeEnvironment.java @@ -34,20 +34,23 @@ public class SparkRuntimeEnvironment { private Config config = new Config(); - public SparkRuntimeEnvironment(Config config) { + public SparkRuntimeEnvironment(Config config, boolean hiveClientSupport) { if (config != null) { this.config = config; } - this.prepare(); + this.prepare(hiveClientSupport); } public Config getConfig() { return this.config; } - public void prepare() { - sparkSession = SparkSession.builder().config(createSparkConf()).enableHiveSupport().getOrCreate(); + public void prepare(boolean hiveClientSupport) { + SparkSession.Builder sparkSessionBuilder = SparkSession.builder().config(createSparkConf()); + + this.sparkSession = hiveClientSupport ? sparkSessionBuilder.enableHiveSupport().getOrCreate() + : sparkSessionBuilder.getOrCreate(); } private SparkConf createSparkConf() { diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java index c92fe47e9c..6935276049 100644 --- a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java +++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java @@ -41,6 +41,7 @@ public class SparkApplicationTestBase { config.put("spark.ui.port", 13000); config.put("spark.master", "local[4]"); - sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config)); + // The hive client is disabled by default, and the local execution of Unit Test is guaranteed to be successful. + sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config), false); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java index 699fc98d76..2e384ef48e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java @@ -39,6 +39,9 @@ public class SparkArgsUtils { private static final String SPARK_ON_YARN = "yarn"; + private static final String DEFAULT_QUALITY_CLASS = + "org.apache.dolphinscheduler.data.quality.DataQualityApplication"; + private SparkArgsUtils() { throw new IllegalStateException("Utility class"); } @@ -62,9 +65,9 @@ public class SparkArgsUtils { ProgramType programType = param.getProgramType(); String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { + if (programType != null && programType != ProgramType.PYTHON) { args.add(SparkConstants.MAIN_CLASS); - args.add(mainClass); + args.add(StringUtils.isNotEmpty(mainClass) ? mainClass : DEFAULT_QUALITY_CLASS); } int driverCores = param.getDriverCores();