diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java index 37d783b640..c87b3453a1 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.datasource.api.client; +import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -24,13 +25,15 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; + import com.google.common.base.Stopwatch; +import com.zaxxer.hikari.HikariDataSource; @Slf4j public class CommonDataSourceClient implements DataSourceClient { @@ -39,7 +42,8 @@ public class CommonDataSourceClient implements DataSourceClient { public static final String COMMON_VALIDATION_QUERY = "select 1"; protected final BaseConnectionParam baseConnectionParam; - protected Connection connection; + protected HikariDataSource dataSource; + protected JdbcTemplate jdbcTemplate; public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { this.baseConnectionParam = baseConnectionParam; @@ -59,7 +63,8 @@ public class CommonDataSourceClient implements DataSourceClient { } protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { - this.connection = buildConn(baseConnectionParam); + this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType); + this.jdbcTemplate = new JdbcTemplate(dataSource); } protected void checkUser(BaseConnectionParam baseConnectionParam) { @@ -68,20 +73,6 @@ public class CommonDataSourceClient implements DataSourceClient { } } - private Connection buildConn(BaseConnectionParam baseConnectionParam) { - Connection conn = null; - try { - Class.forName(baseConnectionParam.getDriverClassName()); - conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(), - baseConnectionParam.getPassword()); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Driver load fail", e); - } catch (SQLException e) { - throw new RuntimeException("JDBC connect failed", e); - } - return conn; - } - protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) { baseConnectionParam.setUser(COMMON_USER); } @@ -101,7 +92,7 @@ public class CommonDataSourceClient implements DataSourceClient { // Checking data source client Stopwatch stopwatch = Stopwatch.createStarted(); try { - this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery(); + this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery()); } catch (Exception e) { throw new RuntimeException("JDBC connect failed", e); } finally { @@ -113,21 +104,20 @@ public class CommonDataSourceClient implements DataSourceClient { @Override public Connection getConnection() { try { - return connection.isClosed() ? buildConn(baseConnectionParam) : connection; + return this.dataSource.getConnection(); } catch (SQLException e) { - throw new RuntimeException("get conn is fail", e); + log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e); + return null; } } @Override public void close() { - log.info("do close connection {}.", baseConnectionParam.getDatabase()); - try { - connection.close(); - } catch (SQLException e) { - log.info("colse connection fail"); - throw new RuntimeException(e); + log.info("do close dataSource {}.", baseConnectionParam.getDatabase()); + try (HikariDataSource closedDatasource = dataSource) { + // only close the resource } + this.jdbcTemplate = null; } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java index 53af3946b8..cf7db2e3b2 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java @@ -25,9 +25,14 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import com.google.common.base.Stopwatch; + @Slf4j public class AzureSQLDataSourceClient extends CommonDataSourceClient { @@ -44,4 +49,33 @@ public class AzureSQLDataSourceClient extends CommonDataSourceClient { return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam); } + @Override + public void checkClient() { + + AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam; + Stopwatch stopwatch = Stopwatch.createStarted(); + String validationQuery = this.baseConnectionParam.getValidationQuery(); + if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) { + // Checking data source client + try { + this.jdbcTemplate.execute(validationQuery); + } catch (Exception e) { + throw new RuntimeException("JDBC connect failed", e); + } finally { + log.info("Time to execute check jdbc client with sql {} for {} ms ", + this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + } else { + try (Statement statement = getConnection().createStatement()) { + if (!statement.execute(validationQuery)) { + throw new SQLException("execute check azure sql token client failed : " + validationQuery); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + log.info("Time to execute check azure sql token client with sql {} for {} ms ", + this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + } + } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index 3c28551e44..15270f60a3 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; +import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -32,9 +33,13 @@ import sun.security.krb5.Config; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.SQLException; import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; + @Slf4j public class HiveDataSourceClient extends CommonDataSourceClient { @@ -47,6 +52,17 @@ public class HiveDataSourceClient extends CommonDataSourceClient { log.info("PreInit in {}", getClass().getName()); } + @Override + protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + log.info("Create UserGroupInformation."); + UserGroupInformationFactory.login(baseConnectionParam.getUser()); + log.info("Create ugi success."); + + this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); + this.jdbcTemplate = new JdbcTemplate(dataSource); + log.info("Init {} success.", getClass().getName()); + } + @Override protected void checkEnv(BaseConnectionParam baseConnectionParam) { super.checkEnv(baseConnectionParam); @@ -70,6 +86,20 @@ public class HiveDataSourceClient extends CommonDataSourceClient { } } + @Override + public Connection getConnection() { + Connection connection = null; + while (connection == null) { + try { + connection = dataSource.getConnection(); + } catch (SQLException e) { + UserGroupInformationFactory.logout(baseConnectionParam.getUser()); + UserGroupInformationFactory.login(baseConnectionParam.getUser()); + } + } + return connection; + } + @Override public void close() { try { diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java index 2d3954fff3..3e0af69577 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java @@ -18,11 +18,17 @@ package org.apache.dolphinscheduler.plugin.datasource.kyuubi; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; +import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; +import java.sql.Connection; +import java.sql.SQLException; + import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; + @Slf4j public class KyuubiDataSourceClient extends CommonDataSourceClient { @@ -35,11 +41,32 @@ public class KyuubiDataSourceClient extends CommonDataSourceClient { log.info("PreInit in {}", getClass().getName()); } + @Override + protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + + this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); + this.jdbcTemplate = new JdbcTemplate(dataSource); + log.info("Init {} success.", getClass().getName()); + } + @Override protected void checkEnv(BaseConnectionParam baseConnectionParam) { super.checkEnv(baseConnectionParam); } + @Override + public Connection getConnection() { + Connection connection = null; + while (connection == null) { + try { + connection = dataSource.getConnection(); + } catch (SQLException e) { + log.error("Failed to get Kyuubi Connection.", e); + } + } + return connection; + } + @Override public void close() { super.close(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java index 03b6f44c24..041420cc48 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.plugin.datasource.kyuubi; import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; @@ -48,6 +49,13 @@ public class KyuubiDataSourceClientTest { Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam); } + @Test + public void testInitClient() { + KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam(); + kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI); + Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI); + } + @Test public void testCheckClient() { kyuubiDataSourceClient.checkClient(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java index 74aeb30ee2..186e5afd19 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java @@ -18,11 +18,21 @@ package org.apache.dolphinscheduler.plugin.datasource.redshift; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; +import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode; +import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; +import com.google.common.base.Stopwatch; + @Slf4j public class RedshiftDataSourceClient extends CommonDataSourceClient { @@ -30,4 +40,41 @@ public class RedshiftDataSourceClient extends CommonDataSourceClient { super(baseConnectionParam, dbType); } + @Override + public Connection getConnection() { + RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam; + if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) { + return super.getConnection(); + } + return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam); + } + + @Override + public void checkClient() { + RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam; + Stopwatch stopwatch = Stopwatch.createStarted(); + String validationQuery = this.baseConnectionParam.getValidationQuery(); + if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) { + // Checking data source client + try { + this.jdbcTemplate.execute(validationQuery); + } catch (Exception e) { + throw new RuntimeException("JDBC connect failed", e); + } finally { + log.info("Time to execute check jdbc client with sql {} for {} ms ", + this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + } else { + try (Statement statement = getConnection().createStatement()) { + if (!statement.execute(validationQuery)) { + throw new SQLException("execute check redshift access key failed : " + validationQuery); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + log.info("Time to execute check redshift access key with sql {} for {} ms ", + this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } + } + } }