From 206b7c1c51e84f2e937748bc94191b23f95298e9 Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Wed, 27 Apr 2022 16:56:24 +0800 Subject: [PATCH] [Fix-9352][datasource-api]data source client uses a cache with an expiration time (#9353) * datasource client cache with expiration time * remove unused import * fixed unit test * fix * fix * fix * fix * fix * fix * fix --- .../api/plugin/DataSourceClientProvider.java | 22 ++++++++++------ .../api/utils/DataSourceUtilsTest.java | 13 +++++++--- .../datasource/hive/HiveDataSourceClient.java | 25 +++++++++++-------- .../plugin/task/datax/DataxTask.java | 5 ++-- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java index fb17a0da8a..6c2f8b1ac6 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java @@ -17,25 +17,33 @@ package org.apache.dolphinscheduler.plugin.datasource.api.plugin; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; import org.apache.dolphinscheduler.spi.enums.DbType; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DataSourceClientProvider { private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class); - private static final Map uniqueId2dataSourceClientMap = new ConcurrentHashMap<>(); - + private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24); + private static final Cache uniqueId2dataSourceClientCache = CacheBuilder.newBuilder() + .expireAfterWrite(duration, TimeUnit.HOURS) + .maximumSize(100) + .build(); private DataSourcePluginManager dataSourcePluginManager; private DataSourceClientProvider() { @@ -50,12 +58,12 @@ public class DataSourceClientProvider { return DataSourceClientProviderHolder.INSTANCE; } - public Connection getConnection(DbType dbType, ConnectionParam connectionParam) { + public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException { BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam; String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType); logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId); - DataSourceClient dataSourceClient = uniqueId2dataSourceClientMap.computeIfAbsent(datasourceUniqueId, $ -> { + DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> { Map dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap(); DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp()); if (null == dataSourceChannel) { diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java index 81611903c2..c9c02ba188 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java @@ -21,25 +21,31 @@ import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLC import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; +import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import java.sql.Connection; import java.sql.DriverManager; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) -@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class}) +@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils") +@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class, PropertyUtils.class}) public class DataSourceUtilsTest { @Test @@ -85,8 +91,9 @@ public class DataSourceUtilsTest { } @Test - public void testGetConnection() { - + public void testGetConnection() throws ExecutionException { + PowerMockito.mockStatic(PropertyUtils.class); + PowerMockito.when(PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24L)).thenReturn(24L); PowerMockito.mockStatic(DataSourceClientProvider.class); DataSourceClientProvider clientProvider = PowerMockito.mock(DataSourceClientProvider.class); PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index 642e06af8a..18a0490033 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -17,10 +17,7 @@ package org.apache.dolphinscheduler.plugin.datasource.hive; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE; - +import com.zaxxer.hikari.HikariDataSource; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.utils.CommonUtil; @@ -29,9 +26,11 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.security.krb5.Config; import java.io.IOException; import java.lang.reflect.Field; @@ -41,12 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.zaxxer.hikari.HikariDataSource; - -import sun.security.krb5.Config; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*; public class HiveDataSourceClient extends CommonDataSourceClient { @@ -57,6 +51,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient { private Configuration hadoopConf; protected HikariDataSource oneSessionDataSource; private UserGroupInformation ugi; + private boolean retryGetConnection = true; public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { super(baseConnectionParam, dbType); @@ -149,6 +144,14 @@ public class HiveDataSourceClient extends CommonDataSourceClient { try { return oneSessionDataSource.getConnection(); } catch (SQLException e) { + boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); + if (retryGetConnection && kerberosStartupState) { + retryGetConnection = false; + createUserGroupInformation(baseConnectionParam.getUser()); + Connection connection = getConnection(); + retryGetConnection = true; + return connection; + } logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e); return null; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index e24276701d..5627edad18 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -557,8 +558,8 @@ public class DataxTask extends AbstractTaskExecutor { for (int i = 1; i <= num; i++) { columnNames[i - 1] = md.getColumnName(i); } - } catch (SQLException e) { - logger.warn(e.getMessage(), e); + } catch (SQLException | ExecutionException e) { + logger.error(e.getMessage(), e); return null; }