Browse Source

[Fix-10918] Close datasource when expire from guava cache (#11120)

* Close datasource when expire from guava cache
* Remove duplicate datasource in HiveDataSourceClient
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
71cf7e1baa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
  2. 25
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
  3. 25
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
  4. 3
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java

11
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

@ -27,13 +27,12 @@ import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.zaxxer.hikari.HikariDataSource;
public class CommonDataSourceClient implements DataSourceClient { public class CommonDataSourceClient implements DataSourceClient {
@ -43,7 +42,7 @@ public class CommonDataSourceClient implements DataSourceClient {
public static final String COMMON_VALIDATION_QUERY = "select 1"; public static final String COMMON_VALIDATION_QUERY = "select 1";
protected final BaseConnectionParam baseConnectionParam; protected final BaseConnectionParam baseConnectionParam;
protected DataSource dataSource; protected HikariDataSource dataSource;
protected JdbcTemplate jdbcTemplate; protected JdbcTemplate jdbcTemplate;
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
@ -113,8 +112,10 @@ public class CommonDataSourceClient implements DataSourceClient {
@Override @Override
public void close() { public void close() {
logger.info("do close dataSource."); logger.info("do close dataSource {}.", baseConnectionParam.getDatabase());
this.dataSource = null; try (HikariDataSource closedDatasource = dataSource) {
// only close the resource
}
this.jdbcTemplate = null; this.jdbcTemplate = null;
} }

25
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin; package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@ -27,23 +25,32 @@ import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
public class DataSourceClientProvider { public class DataSourceClientProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class); private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24); private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder() private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
.expireAfterWrite(duration, TimeUnit.HOURS) .expireAfterWrite(duration, TimeUnit.HOURS)
.maximumSize(100) .removalListener((RemovalListener<String, DataSourceClient>) notification -> {
.build(); try (DataSourceClient closedClient = notification.getValue()) {
logger.info("Datasource: {} is removed from cache due to expire", notification.getKey());
}
})
.maximumSize(100)
.build();
private DataSourcePluginManager dataSourcePluginManager; private DataSourcePluginManager dataSourcePluginManager;
private DataSourceClientProvider() { private DataSourceClientProvider() {
@ -61,7 +68,7 @@ public class DataSourceClientProvider {
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException { public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam; BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType); String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId); logger.info("Get connection from datasource {}", datasourceUniqueId);
DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> { DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap(); Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();

25
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

@ -17,8 +17,10 @@
package org.apache.dolphinscheduler.plugin.datasource.hive; package org.apache.dolphinscheduler.plugin.datasource.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
import com.zaxxer.hikari.HikariDataSource; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil; import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
@ -27,11 +29,9 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.security.krb5.Config;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -41,7 +41,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import sun.security.krb5.Config;
public class HiveDataSourceClient extends CommonDataSourceClient { public class HiveDataSourceClient extends CommonDataSourceClient {
@ -50,7 +55,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
private ScheduledExecutorService kerberosRenewalService; private ScheduledExecutorService kerberosRenewalService;
private Configuration hadoopConf; private Configuration hadoopConf;
protected HikariDataSource oneSessionDataSource;
private UserGroupInformation ugi; private UserGroupInformation ugi;
private boolean retryGetConnection = true; private boolean retryGetConnection = true;
@ -76,7 +80,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
logger.info("Create ugi success."); logger.info("Create ugi success.");
super.initClient(baseConnectionParam, dbType); super.initClient(baseConnectionParam, dbType);
this.oneSessionDataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
logger.info("Init {} success.", getClass().getName()); logger.info("Init {} success.", getClass().getName());
} }
@ -144,7 +148,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
@Override @Override
public Connection getConnection() { public Connection getConnection() {
try { try {
return oneSessionDataSource.getConnection(); return dataSource.getConnection();
} catch (SQLException e) { } catch (SQLException e) {
boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (retryGetConnection && kerberosStartupState) { if (retryGetConnection && kerberosStartupState) {
@ -166,8 +170,5 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
logger.info("close {}.", this.getClass().getSimpleName()); logger.info("close {}.", this.getClass().getSimpleName());
kerberosRenewalService.shutdown(); kerberosRenewalService.shutdown();
this.ugi = null; this.ugi = null;
this.oneSessionDataSource.close();
this.oneSessionDataSource = null;
} }
} }

3
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java

@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.spi.datasource;
import java.sql.Connection; import java.sql.Connection;
public interface DataSourceClient { public interface DataSourceClient extends AutoCloseable {
void checkClient(); void checkClient();
@Override
void close(); void close();
Connection getConnection(); Connection getConnection();

Loading…
Cancel
Save