From 71cf7e1baa7b813ac381c37c0345e837d2c07557 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sat, 23 Jul 2022 20:43:37 +0800 Subject: [PATCH] [Fix-10918] Close datasource when expire from guava cache (#11120) * Close datasource when expire from guava cache * Remove duplicate datasource in HiveDataSourceClient --- .../api/client/CommonDataSourceClient.java | 11 ++++---- .../api/plugin/DataSourceClientProvider.java | 25 ++++++++++++------- .../datasource/hive/HiveDataSourceClient.java | 25 ++++++++++--------- .../spi/datasource/DataSourceClient.java | 3 ++- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java index 37af397758..50de82f93d 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java @@ -27,13 +27,12 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.concurrent.TimeUnit; -import javax.sql.DataSource; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; import com.google.common.base.Stopwatch; +import com.zaxxer.hikari.HikariDataSource; public class CommonDataSourceClient implements DataSourceClient { @@ -43,7 +42,7 @@ public class CommonDataSourceClient implements DataSourceClient { public static final String COMMON_VALIDATION_QUERY = "select 1"; protected final BaseConnectionParam baseConnectionParam; - protected DataSource dataSource; + protected HikariDataSource dataSource; protected JdbcTemplate jdbcTemplate; public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { @@ -113,8 +112,10 @@ public class CommonDataSourceClient implements DataSourceClient { @Override public void close() { - logger.info("do close dataSource."); - this.dataSource = null; + logger.info("do close dataSource {}.", baseConnectionParam.getDatabase()); + try (HikariDataSource closedDatasource = dataSource) { + // only close the resource + } this.jdbcTemplate = null; } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java index 6c2f8b1ac6..a849fa6b48 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.plugin.datasource.api.plugin; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; @@ -27,23 +25,32 @@ import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; public class DataSourceClientProvider { private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class); - private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24); + private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24); private static final Cache uniqueId2dataSourceClientCache = CacheBuilder.newBuilder() - .expireAfterWrite(duration, TimeUnit.HOURS) - .maximumSize(100) - .build(); + .expireAfterWrite(duration, TimeUnit.HOURS) + .removalListener((RemovalListener) notification -> { + try (DataSourceClient closedClient = notification.getValue()) { + logger.info("Datasource: {} is removed from cache due to expire", notification.getKey()); + } + }) + .maximumSize(100) + .build(); private DataSourcePluginManager dataSourcePluginManager; private DataSourceClientProvider() { @@ -61,7 +68,7 @@ public class DataSourceClientProvider { public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException { BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam; String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType); - logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId); + logger.info("Get connection from datasource {}", datasourceUniqueId); DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> { Map dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index dd4f7e89ca..13c1263fa2 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -17,8 +17,10 @@ package org.apache.dolphinscheduler.plugin.datasource.hive; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.zaxxer.hikari.HikariDataSource; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH; + import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil; @@ -27,11 +29,9 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.security.krb5.Config; import java.io.IOException; import java.lang.reflect.Field; @@ -41,7 +41,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import sun.security.krb5.Config; public class HiveDataSourceClient extends CommonDataSourceClient { @@ -50,7 +55,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient { private ScheduledExecutorService kerberosRenewalService; private Configuration hadoopConf; - protected HikariDataSource oneSessionDataSource; private UserGroupInformation ugi; private boolean retryGetConnection = true; @@ -76,7 +80,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient { logger.info("Create ugi success."); super.initClient(baseConnectionParam, dbType); - this.oneSessionDataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); + this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); logger.info("Init {} success.", getClass().getName()); } @@ -144,7 +148,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient { @Override public Connection getConnection() { try { - return oneSessionDataSource.getConnection(); + return dataSource.getConnection(); } catch (SQLException e) { boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); if (retryGetConnection && kerberosStartupState) { @@ -166,8 +170,5 @@ public class HiveDataSourceClient extends CommonDataSourceClient { logger.info("close {}.", this.getClass().getSimpleName()); kerberosRenewalService.shutdown(); this.ugi = null; - - this.oneSessionDataSource.close(); - this.oneSessionDataSource = null; } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java index 879d198284..82eb1f02cb 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java @@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.spi.datasource; import java.sql.Connection; -public interface DataSourceClient { +public interface DataSourceClient extends AutoCloseable { void checkClient(); + @Override void close(); Connection getConnection();