From b11431db9565093a766e27dc12240a27e44f2f4c Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 1 Feb 2023 20:41:05 +0800 Subject: [PATCH] Use single thread to refresh kerberos (#13456) --- .../common/thread/ThreadUtils.java | 9 ++ .../datasource/hive/HiveDataSourceClient.java | 87 ++---------- .../security/UserGroupInformationFactory.java | 129 ++++++++++++++++++ .../datasource/hive/utils/CommonUtil.java | 67 --------- 4 files changed, 149 insertions(+), 143 deletions(-) create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java delete mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 6eca0e783f..f73c40b960 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import lombok.experimental.UtilityClass; @@ -45,6 +46,14 @@ public class ThreadUtils { return Executors.newFixedThreadPool(threadsNum, threadFactory); } + public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(threadName) + .setDaemon(true) + .build(); + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + /** * Sleep in given mills, this is not accuracy. */ diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index f2539c7992..c7eef482da 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -21,44 +21,29 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_S import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; -import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil; +import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import sun.security.krb5.Config; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import java.io.IOException; import java.lang.reflect.Field; import java.sql.Connection; import java.sql.SQLException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - public class HiveDataSourceClient extends CommonDataSourceClient { private static final Logger logger = LoggerFactory.getLogger(HiveDataSourceClient.class); - private ScheduledExecutorService kerberosRenewalService; - - private Configuration hadoopConf; - private UserGroupInformation ugi; - private boolean retryGetConnection = true; - public HiveDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { super(baseConnectionParam, dbType); } @@ -66,18 +51,12 @@ public class HiveDataSourceClient extends CommonDataSourceClient { @Override protected void preInit() { logger.info("PreInit in {}", getClass().getName()); - this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build()); } @Override protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) { - logger.info("Create Configuration for hive configuration."); - this.hadoopConf = createHadoopConf(); - logger.info("Create Configuration success."); - logger.info("Create UserGroupInformation."); - this.ugi = createUserGroupInformation(baseConnectionParam.getUser()); + UserGroupInformationFactory.login(baseConnectionParam.getUser()); logger.info("Create ugi success."); this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType); @@ -108,61 +87,18 @@ public class HiveDataSourceClient extends CommonDataSourceClient { } } - private UserGroupInformation createUserGroupInformation(String username) { - String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH); - String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH); - String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME); - - try { - UserGroupInformation ugi = CommonUtil.createUGI(getHadoopConf(), principal, keytab, krb5File, username); - try { - Field isKeytabField = ugi.getClass().getDeclaredField("isKeytab"); - isKeytabField.setAccessible(true); - isKeytabField.set(ugi, true); - } catch (NoSuchFieldException | IllegalAccessException e) { - logger.warn(e.getMessage()); - } - - kerberosRenewalService.scheduleWithFixedDelay(() -> { - try { - ugi.checkTGTAndReloginFromKeytab(); - } catch (IOException e) { - logger.error("Check TGT and Renewal from Keytab error", e); - } - }, 5, 5, TimeUnit.MINUTES); - return ugi; - } catch (IOException e) { - throw new RuntimeException("createUserGroupInformation fail. ", e); - } - } - - protected Configuration createHadoopConf() { - Configuration hadoopConf = new Configuration(); - hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", true); - return hadoopConf; - } - - protected Configuration getHadoopConf() { - return this.hadoopConf; - } - @Override public Connection getConnection() { - try { - return dataSource.getConnection(); - } catch (SQLException e) { - boolean kerberosStartupState = - PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); - if (retryGetConnection && kerberosStartupState) { - retryGetConnection = false; - createUserGroupInformation(baseConnectionParam.getUser()); - Connection connection = getConnection(); - retryGetConnection = true; - return connection; + Connection connection = null; + while (connection == null) { + try { + connection = dataSource.getConnection(); + } catch (SQLException e) { + UserGroupInformationFactory.logout(baseConnectionParam.getUser()); + UserGroupInformationFactory.login(baseConnectionParam.getUser()); } - logger.error("get oneSessionDataSource Connection fail SQLException: {}", e.getMessage(), e); - return null; } + return connection; } @Override @@ -170,8 +106,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient { try { super.close(); } finally { - kerberosRenewalService.shutdown(); - this.ugi = null; + UserGroupInformationFactory.logout(baseConnectionParam.getUser()); } logger.info("Closed Hive datasource client."); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java new file mode 100644 index 0000000000..168ff3bdca --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/security/UserGroupInformationFactory.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.hive.security; + +import static org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class UserGroupInformationFactory { + + private static final Map currentLoginTimesMap = new HashMap<>(); + + private static final Map userGroupInformationMap = new HashMap<>(); + + private static final ScheduledExecutorService kerberosRenewalService = + ThreadUtils.newSingleDaemonScheduledExecutorService("Hive-Kerberos-Renewal-Thread-"); + + static { + kerberosRenewalService.scheduleWithFixedDelay(() -> { + if (userGroupInformationMap.isEmpty()) { + return; + } + userGroupInformationMap.forEach((key, ugi) -> { + try { + if (ugi.isFromKeytab()) { + ugi.checkTGTAndReloginFromKeytab(); + } + log.info("Relogin from keytab success, user: {}", key); + } catch (Exception e) { + log.error("Relogin from keytab failed, user: {}", key, e); + } + }); + }, 0, 5, TimeUnit.MINUTES); + } + + public synchronized static UserGroupInformation login(String userName) { + UserGroupInformation userGroupInformation = userGroupInformationMap.get(userName); + if (userGroupInformation == null) { + if (!openKerberos()) { + userGroupInformation = createRemoteUser(userName); + } else { + userGroupInformation = createKerberosUser(); + } + userGroupInformationMap.put(userName, userGroupInformation); + } + currentLoginTimesMap.compute(userName, (k, v) -> v == null ? 1 : v + 1); + return userGroupInformation; + } + + public synchronized static void logout(String userName) { + Integer currentLoginTimes = currentLoginTimesMap.get(userName); + if (currentLoginTimes == null) { + return; + } + if (currentLoginTimes <= 1) { + currentLoginTimesMap.remove(userName); + userGroupInformationMap.remove(userName); + } else { + currentLoginTimesMap.put(userName, currentLoginTimes - 1); + } + } + + private static UserGroupInformation createRemoteUser(String userName) { + return UserGroupInformation.createRemoteUser(userName); + } + + private static UserGroupInformation createKerberosUser() { + String krb5File = PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH); + String keytab = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH); + String principal = PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME); + if (StringUtils.isNotBlank(krb5File)) { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); + } + + Configuration hadoopConf = new Configuration(); + hadoopConf.setBoolean("ipc.client.fallback-to-simple-auth-allowed", true); + hadoopConf.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS); + + try { + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation userGroupInformation = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keytab.trim()); + UserGroupInformation.setLoginUser(userGroupInformation); + return userGroupInformation; + } catch (IOException e) { + throw new RuntimeException("createUserGroupInformation fail. ", e); + } + } + + public static boolean openKerberos() { + String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE); + ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); + Boolean kerberosStartupState = + PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); + return resUploadType == ResUploadType.HDFS && kerberosStartupState; + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java deleted file mode 100644 index be9ef1ea1f..0000000000 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/utils/CommonUtil.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.datasource.hive.utils; - -import static org.apache.dolphinscheduler.common.constants.Constants.JAVA_SECURITY_KRB5_CONF; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.ResUploadType; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; - -import java.io.IOException; -import java.util.Objects; - -import lombok.experimental.UtilityClass; - -@UtilityClass -public class CommonUtil { - - public static boolean getKerberosStartupState() { - String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE); - ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); - Boolean kerberosStartupState = - PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); - return resUploadType == ResUploadType.HDFS && kerberosStartupState; - } - - public static synchronized UserGroupInformation createUGI(Configuration configuration, String principal, - String keyTab, String krb5File, - String username) throws IOException { - if (getKerberosStartupState()) { - Objects.requireNonNull(keyTab); - if (StringUtils.isNotBlank(krb5File)) { - System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5File); - } - return loginKerberos(configuration, principal, keyTab); - } - return UserGroupInformation.createRemoteUser(username); - } - - public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, - final String keyTab) throws IOException { - config.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS); - UserGroupInformation.setConfiguration(config); - UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); - return UserGroupInformation.getCurrentUser(); - } - -}