From e4fb5b30a45b5e2380841e8b37dff2f0408fc5a7 Mon Sep 17 00:00:00 2001 From: xuhui1231 <51685494+xuhui1231@users.noreply.github.com> Date: Fri, 21 Jul 2023 16:14:21 +0800 Subject: [PATCH] datasource test and sql task Remove connection pool issues is #14179 (#14193) * datasource test and sql task Remove connection pool issues is #14179 * datasource test and sql task Remove connection pool issues is #14179 uniform style * datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720 * datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720 --------- Co-authored-by: xuhui Co-authored-by: Jay Chung --- .../api/client/CommonDataSourceClient.java | 42 ++++++++++------- .../azuresql/AzureSQLDataSourceClient.java | 34 -------------- .../datasource/hive/HiveDataSourceClient.java | 30 ------------ .../kyuubi/KyuubiDataSourceClient.java | 27 ----------- .../kyuubi/KyuubiDataSourceClientTest.java | 8 ---- .../redshift/RedshiftDataSourceClient.java | 47 ------------------- 6 files changed, 26 insertions(+), 162 deletions(-) diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java index c87b3453a1..37d783b640 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.plugin.datasource.api.client; -import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -25,15 +24,13 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; - import com.google.common.base.Stopwatch; -import com.zaxxer.hikari.HikariDataSource; @Slf4j public class CommonDataSourceClient implements DataSourceClient { @@ -42,8 +39,7 @@ public class CommonDataSourceClient implements DataSourceClient { public static final String COMMON_VALIDATION_QUERY = "select 1"; protected final BaseConnectionParam baseConnectionParam; - protected HikariDataSource dataSource; - protected JdbcTemplate jdbcTemplate; + protected Connection connection; public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { this.baseConnectionParam = baseConnectionParam; @@ -63,8 +59,7 @@ public class CommonDataSourceClient implements DataSourceClient { } protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { - this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType); - this.jdbcTemplate = new JdbcTemplate(dataSource); + this.connection = buildConn(baseConnectionParam); } protected void checkUser(BaseConnectionParam baseConnectionParam) { @@ -73,6 +68,20 @@ public class CommonDataSourceClient implements DataSourceClient { } } + private Connection buildConn(BaseConnectionParam baseConnectionParam) { + Connection conn = null; + try { + Class.forName(baseConnectionParam.getDriverClassName()); + conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(), + baseConnectionParam.getPassword()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Driver load fail", e); + } catch (SQLException e) { + throw new RuntimeException("JDBC connect failed", e); + } + return conn; + } + protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) { baseConnectionParam.setUser(COMMON_USER); } @@ -92,7 +101,7 @@ public class CommonDataSourceClient implements DataSourceClient { // Checking data source client Stopwatch stopwatch = Stopwatch.createStarted(); try { - this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery()); + this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery(); } catch (Exception e) { throw new RuntimeException("JDBC connect failed", e); } finally { @@ -104,20 +113,21 @@ public class CommonDataSourceClient implements DataSourceClient { @Override public Connection getConnection() { try { - return this.dataSource.getConnection(); + return connection.isClosed() ? buildConn(baseConnectionParam) : connection; } catch (SQLException e) { - log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e); - return null; + throw new RuntimeException("get conn is fail", e); } } @Override public void close() { - log.info("do close dataSource {}.", baseConnectionParam.getDatabase()); - try (HikariDataSource closedDatasource = dataSource) { - // only close the resource + log.info("do close connection {}.", baseConnectionParam.getDatabase()); + try { + connection.close(); + } catch (SQLException e) { + log.info("colse connection fail"); + throw new RuntimeException(e); } - this.jdbcTemplate = null; } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java index cf7db2e3b2..53af3946b8 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java @@ -25,14 +25,9 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import com.google.common.base.Stopwatch; - @Slf4j public class AzureSQLDataSourceClient extends CommonDataSourceClient { @@ -49,33 +44,4 @@ public class AzureSQLDataSourceClient extends CommonDataSourceClient { return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam); } - @Override - public void checkClient() { - - AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam; - Stopwatch stopwatch = Stopwatch.createStarted(); - String validationQuery = this.baseConnectionParam.getValidationQuery(); - if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) { - // Checking data source client - try { - this.jdbcTemplate.execute(validationQuery); - } catch (Exception e) { - throw new RuntimeException("JDBC connect failed", e); - } finally { - log.info("Time to execute check jdbc client with sql {} for {} ms ", - this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } - } else { - try (Statement statement = getConnection().createStatement()) { - if (!statement.execute(validationQuery)) { - throw new SQLException("execute check azure sql token client failed : " + validationQuery); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - log.info("Time to execute check azure sql token client with sql {} for {} ms ", - this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } - } - } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index 15270f60a3..3c28551e44 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; -import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -33,13 +32,9 @@ import sun.security.krb5.Config; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Field; -import java.sql.Connection; -import java.sql.SQLException; import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; - @Slf4j public class HiveDataSourceClient extends CommonDataSourceClient { @@ -52,17 +47,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient { log.info("PreInit in {}", getClass().getName()); } - @Override - protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { - log.info("Create UserGroupInformation."); - UserGroupInformationFactory.login(baseConnectionParam.getUser()); - log.info("Create ugi success."); - - this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); - this.jdbcTemplate = new JdbcTemplate(dataSource); - log.info("Init {} success.", getClass().getName()); - } - @Override protected void checkEnv(BaseConnectionParam baseConnectionParam) { super.checkEnv(baseConnectionParam); @@ -86,20 +70,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient { } } - @Override - public Connection getConnection() { - Connection connection = null; - while (connection == null) { - try { - connection = dataSource.getConnection(); - } catch (SQLException e) { - UserGroupInformationFactory.logout(baseConnectionParam.getUser()); - UserGroupInformationFactory.login(baseConnectionParam.getUser()); - } - } - return connection; - } - @Override public void close() { try { diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java index 3e0af69577..2d3954fff3 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java @@ -18,17 +18,11 @@ package org.apache.dolphinscheduler.plugin.datasource.kyuubi; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; -import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; -import java.sql.Connection; -import java.sql.SQLException; - import lombok.extern.slf4j.Slf4j; -import org.springframework.jdbc.core.JdbcTemplate; - @Slf4j public class KyuubiDataSourceClient extends CommonDataSourceClient { @@ -41,32 +35,11 @@ public class KyuubiDataSourceClient extends CommonDataSourceClient { log.info("PreInit in {}", getClass().getName()); } - @Override - protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { - - this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); - this.jdbcTemplate = new JdbcTemplate(dataSource); - log.info("Init {} success.", getClass().getName()); - } - @Override protected void checkEnv(BaseConnectionParam baseConnectionParam) { super.checkEnv(baseConnectionParam); } - @Override - public Connection getConnection() { - Connection connection = null; - while (connection == null) { - try { - connection = dataSource.getConnection(); - } catch (SQLException e) { - log.error("Failed to get Kyuubi Connection.", e); - } - } - return connection; - } - @Override public void close() { super.close(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java index 041420cc48..03b6f44c24 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.plugin.datasource.kyuubi; import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam; -import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; @@ -49,13 +48,6 @@ public class KyuubiDataSourceClientTest { Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam); } - @Test - public void testInitClient() { - KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam(); - kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI); - Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI); - } - @Test public void testCheckClient() { kyuubiDataSourceClient.checkClient(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java index 186e5afd19..74aeb30ee2 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java @@ -18,21 +18,11 @@ package org.apache.dolphinscheduler.plugin.datasource.redshift; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; -import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode; -import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam; -import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; -import com.google.common.base.Stopwatch; - @Slf4j public class RedshiftDataSourceClient extends CommonDataSourceClient { @@ -40,41 +30,4 @@ public class RedshiftDataSourceClient extends CommonDataSourceClient { super(baseConnectionParam, dbType); } - @Override - public Connection getConnection() { - RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam; - if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) { - return super.getConnection(); - } - return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam); - } - - @Override - public void checkClient() { - RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam; - Stopwatch stopwatch = Stopwatch.createStarted(); - String validationQuery = this.baseConnectionParam.getValidationQuery(); - if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) { - // Checking data source client - try { - this.jdbcTemplate.execute(validationQuery); - } catch (Exception e) { - throw new RuntimeException("JDBC connect failed", e); - } finally { - log.info("Time to execute check jdbc client with sql {} for {} ms ", - this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } - } else { - try (Statement statement = getConnection().createStatement()) { - if (!statement.execute(validationQuery)) { - throw new SQLException("execute check redshift access key failed : " + validationQuery); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - log.info("Time to execute check redshift access key with sql {} for {} ms ", - this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); - } - } - } }