Browse Source

[Fix-9352][datasource-api]data source client uses a cache with an expiration time (#9353)

* datasource client cache with expiration time

* remove unused import

* fixed unit test

* fix

* fix

* fix

* fix

* fix

* fix

* fix
3.0.0/version-upgrade
zhuxt2015 2 years ago committed by GitHub
parent
commit
206b7c1c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
  2. 13
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java
  3. 25
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
  4. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

22
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

@ -17,25 +17,33 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSourceClientProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
private static final Map<String, DataSourceClient> uniqueId2dataSourceClientMap = new ConcurrentHashMap<>();
private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
.expireAfterWrite(duration, TimeUnit.HOURS)
.maximumSize(100)
.build();
private DataSourcePluginManager dataSourcePluginManager;
private DataSourceClientProvider() {
@ -50,12 +58,12 @@ public class DataSourceClientProvider {
return DataSourceClientProviderHolder.INSTANCE;
}
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) {
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
DataSourceClient dataSourceClient = uniqueId2dataSourceClientMap.computeIfAbsent(datasourceUniqueId, $ -> {
DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());
if (null == dataSourceChannel) {

13
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtilsTest.java

@ -21,25 +21,31 @@ import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLC
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class})
@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
@PrepareForTest({Class.class, DriverManager.class, MySQLDataSourceProcessor.class, DataSourceClientProvider.class, PasswordUtils.class, CommonUtils.class, PropertyUtils.class})
public class DataSourceUtilsTest {
@Test
@ -85,8 +91,9 @@ public class DataSourceUtilsTest {
}
@Test
public void testGetConnection() {
public void testGetConnection() throws ExecutionException {
PowerMockito.mockStatic(PropertyUtils.class);
PowerMockito.when(PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24L)).thenReturn(24L);
PowerMockito.mockStatic(DataSourceClientProvider.class);
DataSourceClientProvider clientProvider = PowerMockito.mock(DataSourceClientProvider.class);
PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);

25
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.datasource.hive;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.utils.CommonUtil;
@ -29,9 +26,11 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.security.krb5.Config;
import java.io.IOException;
import java.lang.reflect.Field;
@ -41,12 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariDataSource;
import sun.security.krb5.Config;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
public class HiveDataSourceClient extends CommonDataSourceClient {
@ -57,6 +51,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
private Configuration hadoopConf;
protected HikariDataSource oneSessionDataSource;
private UserGroupInformation ugi;
private boolean retryGetConnection = true;
public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
super(baseConnectionParam, dbType);
@ -149,6 +144,14 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
try {
return oneSessionDataSource.getConnection();
} catch (SQLException e) {
boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (retryGetConnection && kerberosStartupState) {
retryGetConnection = false;
createUserGroupInformation(baseConnectionParam.getUser());
Connection connection = getConnection();
retryGetConnection = true;
return connection;
}
logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e);
return null;
}

5
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -61,6 +61,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -557,8 +558,8 @@ public class DataxTask extends AbstractTaskExecutor {
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i);
}
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
} catch (SQLException | ExecutionException e) {
logger.error(e.getMessage(), e);
return null;
}

Loading…
Cancel
Save