Browse Source

datasource test and sql task Remove connection pool issues is #14179 (#14193)

* datasource test and sql task Remove connection pool issues is #14179

* datasource test and sql task Remove connection pool issues is #14179 uniform style

* datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720

* datasource test and sql task Remove connection pool issues is #14179 uniform style by 20230720

---------

Co-authored-by: xuhui <xuhui@3h1china.com>
Co-authored-by: Jay Chung <zhongjiajie955@gmail.com>
3.2.1-prepare
xuhui1231 1 year ago committed by GitHub
parent
commit
e4fb5b30a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
  2. 34
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
  3. 30
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
  4. 27
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
  5. 8
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
  6. 47
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java

42
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.plugin.datasource.api.client; package org.apache.dolphinscheduler.plugin.datasource.api.client;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
@ -25,15 +24,13 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.zaxxer.hikari.HikariDataSource;
@Slf4j @Slf4j
public class CommonDataSourceClient implements DataSourceClient { public class CommonDataSourceClient implements DataSourceClient {
@ -42,8 +39,7 @@ public class CommonDataSourceClient implements DataSourceClient {
public static final String COMMON_VALIDATION_QUERY = "select 1"; public static final String COMMON_VALIDATION_QUERY = "select 1";
protected final BaseConnectionParam baseConnectionParam; protected final BaseConnectionParam baseConnectionParam;
protected HikariDataSource dataSource; protected Connection connection;
protected JdbcTemplate jdbcTemplate;
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
this.baseConnectionParam = baseConnectionParam; this.baseConnectionParam = baseConnectionParam;
@ -63,8 +59,7 @@ public class CommonDataSourceClient implements DataSourceClient {
} }
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType); this.connection = buildConn(baseConnectionParam);
this.jdbcTemplate = new JdbcTemplate(dataSource);
} }
protected void checkUser(BaseConnectionParam baseConnectionParam) { protected void checkUser(BaseConnectionParam baseConnectionParam) {
@ -73,6 +68,20 @@ public class CommonDataSourceClient implements DataSourceClient {
} }
} }
private Connection buildConn(BaseConnectionParam baseConnectionParam) {
Connection conn = null;
try {
Class.forName(baseConnectionParam.getDriverClassName());
conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(),
baseConnectionParam.getPassword());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Driver load fail", e);
} catch (SQLException e) {
throw new RuntimeException("JDBC connect failed", e);
}
return conn;
}
protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) { protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) {
baseConnectionParam.setUser(COMMON_USER); baseConnectionParam.setUser(COMMON_USER);
} }
@ -92,7 +101,7 @@ public class CommonDataSourceClient implements DataSourceClient {
// Checking data source client // Checking data source client
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
try { try {
this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery()); this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("JDBC connect failed", e); throw new RuntimeException("JDBC connect failed", e);
} finally { } finally {
@ -104,20 +113,21 @@ public class CommonDataSourceClient implements DataSourceClient {
@Override @Override
public Connection getConnection() { public Connection getConnection() {
try { try {
return this.dataSource.getConnection(); return connection.isClosed() ? buildConn(baseConnectionParam) : connection;
} catch (SQLException e) { } catch (SQLException e) {
log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e); throw new RuntimeException("get conn is fail", e);
return null;
} }
} }
@Override @Override
public void close() { public void close() {
log.info("do close dataSource {}.", baseConnectionParam.getDatabase()); log.info("do close connection {}.", baseConnectionParam.getDatabase());
try (HikariDataSource closedDatasource = dataSource) { try {
// only close the resource connection.close();
} catch (SQLException e) {
log.info("colse connection fail");
throw new RuntimeException(e);
} }
this.jdbcTemplate = null;
} }
} }

34
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java

@ -25,14 +25,9 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Stopwatch;
@Slf4j @Slf4j
public class AzureSQLDataSourceClient extends CommonDataSourceClient { public class AzureSQLDataSourceClient extends CommonDataSourceClient {
@ -49,33 +44,4 @@ public class AzureSQLDataSourceClient extends CommonDataSourceClient {
return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam); return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
} }
@Override
public void checkClient() {
AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam;
Stopwatch stopwatch = Stopwatch.createStarted();
String validationQuery = this.baseConnectionParam.getValidationQuery();
if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
// Checking data source client
try {
this.jdbcTemplate.execute(validationQuery);
} catch (Exception e) {
throw new RuntimeException("JDBC connect failed", e);
} finally {
log.info("Time to execute check jdbc client with sql {} for {} ms ",
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
} else {
try (Statement statement = getConnection().createStatement()) {
if (!statement.execute(validationQuery)) {
throw new SQLException("execute check azure sql token client failed : " + validationQuery);
}
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
log.info("Time to execute check azure sql token client with sql {} for {} ms ",
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
}
} }

30
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory; import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
@ -33,13 +32,9 @@ import sun.security.krb5.Config;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
@Slf4j @Slf4j
public class HiveDataSourceClient extends CommonDataSourceClient { public class HiveDataSourceClient extends CommonDataSourceClient {
@ -52,17 +47,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
log.info("PreInit in {}", getClass().getName()); log.info("PreInit in {}", getClass().getName());
} }
@Override
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
log.info("Create UserGroupInformation.");
UserGroupInformationFactory.login(baseConnectionParam.getUser());
log.info("Create ugi success.");
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
this.jdbcTemplate = new JdbcTemplate(dataSource);
log.info("Init {} success.", getClass().getName());
}
@Override @Override
protected void checkEnv(BaseConnectionParam baseConnectionParam) { protected void checkEnv(BaseConnectionParam baseConnectionParam) {
super.checkEnv(baseConnectionParam); super.checkEnv(baseConnectionParam);
@ -86,20 +70,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
} }
} }
@Override
public Connection getConnection() {
Connection connection = null;
while (connection == null) {
try {
connection = dataSource.getConnection();
} catch (SQLException e) {
UserGroupInformationFactory.logout(baseConnectionParam.getUser());
UserGroupInformationFactory.login(baseConnectionParam.getUser());
}
}
return connection;
}
@Override @Override
public void close() { public void close() {
try { try {

27
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java

@ -18,17 +18,11 @@
package org.apache.dolphinscheduler.plugin.datasource.kyuubi; package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
@Slf4j @Slf4j
public class KyuubiDataSourceClient extends CommonDataSourceClient { public class KyuubiDataSourceClient extends CommonDataSourceClient {
@ -41,32 +35,11 @@ public class KyuubiDataSourceClient extends CommonDataSourceClient {
log.info("PreInit in {}", getClass().getName()); log.info("PreInit in {}", getClass().getName());
} }
@Override
protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
this.jdbcTemplate = new JdbcTemplate(dataSource);
log.info("Init {} success.", getClass().getName());
}
@Override @Override
protected void checkEnv(BaseConnectionParam baseConnectionParam) { protected void checkEnv(BaseConnectionParam baseConnectionParam) {
super.checkEnv(baseConnectionParam); super.checkEnv(baseConnectionParam);
} }
@Override
public Connection getConnection() {
Connection connection = null;
while (connection == null) {
try {
connection = dataSource.getConnection();
} catch (SQLException e) {
log.error("Failed to get Kyuubi Connection.", e);
}
}
return connection;
}
@Override @Override
public void close() { public void close() {
super.close(); super.close();

8
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.datasource.kyuubi; package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam; import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection; import java.sql.Connection;
@ -49,13 +48,6 @@ public class KyuubiDataSourceClientTest {
Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam); Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
} }
@Test
public void testInitClient() {
KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam();
kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI);
Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI);
}
@Test @Test
public void testCheckClient() { public void testCheckClient() {
kyuubiDataSourceClient.checkClient(); kyuubiDataSourceClient.checkClient();

47
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java

@ -18,21 +18,11 @@
package org.apache.dolphinscheduler.plugin.datasource.redshift; package org.apache.dolphinscheduler.plugin.datasource.redshift;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Stopwatch;
@Slf4j @Slf4j
public class RedshiftDataSourceClient extends CommonDataSourceClient { public class RedshiftDataSourceClient extends CommonDataSourceClient {
@ -40,41 +30,4 @@ public class RedshiftDataSourceClient extends CommonDataSourceClient {
super(baseConnectionParam, dbType); super(baseConnectionParam, dbType);
} }
@Override
public Connection getConnection() {
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
return super.getConnection();
}
return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
}
@Override
public void checkClient() {
RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
Stopwatch stopwatch = Stopwatch.createStarted();
String validationQuery = this.baseConnectionParam.getValidationQuery();
if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
// Checking data source client
try {
this.jdbcTemplate.execute(validationQuery);
} catch (Exception e) {
throw new RuntimeException("JDBC connect failed", e);
} finally {
log.info("Time to execute check jdbc client with sql {} for {} ms ",
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
} else {
try (Statement statement = getConnection().createStatement()) {
if (!statement.execute(validationQuery)) {
throw new SQLException("execute check redshift access key failed : " + validationQuery);
}
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
log.info("Time to execute check redshift access key with sql {} for {} ms ",
this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
}
} }

Loading…
Cancel
Save