|
|
|
@ -4,8 +4,13 @@ import com.fr.io.base.provider.impl.ConfigRepositoryFactory;
|
|
|
|
|
import com.fr.io.context.info.RepositoryProfile; |
|
|
|
|
import com.fr.io.repository.ResourceRepository; |
|
|
|
|
import com.fr.log.FineLoggerFactory; |
|
|
|
|
import com.fr.plugin.context.PluginContexts; |
|
|
|
|
import com.fr.plugin.context.PluginMarker; |
|
|
|
|
import com.fr.plugin.hdfs.repository.HDFSFactoryProvider; |
|
|
|
|
import com.fr.stable.StableUtils; |
|
|
|
|
import com.fr.stable.StringUtils; |
|
|
|
|
import com.fr.stable.project.ProjectConstants; |
|
|
|
|
import com.fr.workspace.WorkContext; |
|
|
|
|
import org.apache.hadoop.conf.Configuration; |
|
|
|
|
import org.apache.hadoop.fs.FileSystem; |
|
|
|
|
import org.apache.hadoop.fs.Path; |
|
|
|
@ -15,13 +20,14 @@ import org.apache.hadoop.security.SecurityUtil;
|
|
|
|
|
import org.apache.hadoop.security.UserGroupInformation; |
|
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
|
import java.lang.reflect.Method; |
|
|
|
|
import java.net.URI; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Created by rinoux on 2018/8/10. |
|
|
|
|
*/ |
|
|
|
|
public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> { |
|
|
|
|
public static final String IDENTITY = "HDFS"; |
|
|
|
|
static final String IDENTITY = "HDFS"; |
|
|
|
|
|
|
|
|
|
private static final String HDFS_SCHEMA = "hdfs://"; |
|
|
|
|
private static final String DEFAULT_HOST = "localhost"; |
|
|
|
@ -81,15 +87,24 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
String principal = config.getPrincipal(); |
|
|
|
|
String krb5Conf = config.getKrbConf(); |
|
|
|
|
|
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] Host:{}", host); |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] Port:{}", port); |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] Principal:{}", principal); |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] KeyTab:{}", config.getKeyTab()); |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] krb5.conf:{}", krb5Conf); |
|
|
|
|
|
|
|
|
|
//开启了kerberos验证
|
|
|
|
|
boolean kerberos = kerberosAuthenticated(config); |
|
|
|
|
if (kerberos) { |
|
|
|
|
|
|
|
|
|
//是否需要kerberos验证
|
|
|
|
|
boolean needKrbAuth = needKrbAuth(config); |
|
|
|
|
|
|
|
|
|
if (needKrbAuth) { |
|
|
|
|
try { |
|
|
|
|
System.setProperty("java.security.krb5.conf", krb5Conf); |
|
|
|
|
conf.set("hadoop.security.authentication", "kerberos"); |
|
|
|
|
|
|
|
|
|
processConfForPrincipal(conf, principal); |
|
|
|
|
//需要重新刷新一下让krb5.conf配置生效
|
|
|
|
|
refreshConfig(); |
|
|
|
|
|
|
|
|
|
//类似OSGI下,类加载需要设置SecurityUtil.setSecurityInfoProviders(new AnnotatedSecurityInfo());
|
|
|
|
|
//refer to https://stackoverflow.com/questions/37608049/how-to-connect-with-hdfs-via-kerberos-from-osgi-bundles
|
|
|
|
|
SecurityUtil.setSecurityInfoProviders(new AnnotatedSecurityInfo()); |
|
|
|
@ -102,12 +117,12 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
kerberosAuthModeSet = true; |
|
|
|
|
} |
|
|
|
|
} else if (kerberosAuthModeSet) { |
|
|
|
|
//如果不需要Kerberos认证,要设置认证方式和校验方式为默认
|
|
|
|
|
conf.set("hadoop.security.authorization", "false"); |
|
|
|
|
conf.set("hadoop.security.authentication", "simple"); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(principal) && !kerberos) { |
|
|
|
|
if (StringUtils.isNotEmpty(principal) && !needKrbAuth) { |
|
|
|
|
return FileSystem.newInstance(URI.create(hdfsUrl), conf, principal); |
|
|
|
|
} else { |
|
|
|
|
return FileSystem.newInstance(URI.create(hdfsUrl), conf); |
|
|
|
@ -136,34 +151,72 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
conf.setClass("fs.hdfs.impl", DistributedFileSystem.class, FileSystem.class); |
|
|
|
|
conf.set("ipc.client.fallback-to-simple-auth-allowed", "true"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return readXmlProperties(conf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 如果明确用代码设置了值不会从hdfs-site.xml,core-site.xml读取属性 |
|
|
|
|
* <p> |
|
|
|
|
* 这里主要是为了读取用户一些特殊的配置 |
|
|
|
|
* 比如rpc保护模式、principal格式限制之类的 |
|
|
|
|
* |
|
|
|
|
* @param conf 配置 |
|
|
|
|
* @return 加载了hdfs-site.xml,core-site.xml文件的配置 |
|
|
|
|
*/ |
|
|
|
|
private Configuration readXmlProperties(Configuration conf) { |
|
|
|
|
|
|
|
|
|
//插件目录
|
|
|
|
|
PluginMarker marker = PluginContexts.currentContext().getMarker(); |
|
|
|
|
if (marker != null) { |
|
|
|
|
String location = StableUtils.pathJoin(ProjectConstants.PLUGINS_NAME, marker.getHomeName()); |
|
|
|
|
String coreSiteXml = StableUtils.pathJoin(location, "core-site.xml"); |
|
|
|
|
String hdfsSiteXml = StableUtils.pathJoin(location, "hdfs-site.xml"); |
|
|
|
|
if (WorkContext.getWorkResource().exist(coreSiteXml)) { |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] core-site.xml is add to configuration"); |
|
|
|
|
conf.addResource(WorkContext.getWorkResource().openStream(coreSiteXml)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (WorkContext.getWorkResource().exist(hdfsSiteXml)) { |
|
|
|
|
FineLoggerFactory.getLogger().debug("[HDFS REPOSITORY] hdfs-site.xml is add to configuration"); |
|
|
|
|
conf.addResource(WorkContext.getWorkResource().openStream(hdfsSiteXml)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return conf; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private boolean kerberosAuthenticated(HDFSConfig config) { |
|
|
|
|
/** |
|
|
|
|
* 根据是否配置了kerberos的字段决定是否走kerberos认证逻辑 |
|
|
|
|
* |
|
|
|
|
* @param config |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
private boolean needKrbAuth(HDFSConfig config) { |
|
|
|
|
return StringUtils.isNotEmpty(config.getKeyTab()) |
|
|
|
|
&& StringUtils.isNotEmpty(config.getPrincipal()) |
|
|
|
|
&& StringUtils.isNotEmpty(config.getKrbConf()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* BI-third内置了hadoop2.6的包,插件优先从lib下加载类, |
|
|
|
|
* 此时kerberos认证会报错"Failed to specify server's Kerberos principal name" |
|
|
|
|
* 需要设置一下principal的格式 |
|
|
|
|
* @param conf |
|
|
|
|
* @param principal |
|
|
|
|
* 刷新hadoop配置对象,加载设置的jvm参数里配置的krb文件 |
|
|
|
|
* |
|
|
|
|
* @throws Exception |
|
|
|
|
*/ |
|
|
|
|
private void processConfForPrincipal(Configuration conf, String principal) { |
|
|
|
|
//2.6.2以前的版本hdfs-site.xml没有默认的pricipal格式设置,需要手动加上
|
|
|
|
|
//根据Kerberos V5 principal的格式primary/instance@REALM,确定实际的格式
|
|
|
|
|
String principalPattern; |
|
|
|
|
int primaryIdx = principal.indexOf("hdfs/"); |
|
|
|
|
int atIdx = principal.indexOf("@"); |
|
|
|
|
if (primaryIdx > -1 && atIdx > primaryIdx) { |
|
|
|
|
String name = principal.substring(primaryIdx + "hdfs/".length(), atIdx - 1); |
|
|
|
|
principalPattern = principal.replace(name, "*"); |
|
|
|
|
conf.set("dfs.namenode.kerberos.principal.pattern", principalPattern); |
|
|
|
|
private void refreshConfig() throws Exception { |
|
|
|
|
Class<?> configClassRef; |
|
|
|
|
if (System.getProperty("java.vendor").contains("IBM")) { |
|
|
|
|
configClassRef = Class.forName("com.ibm.security.krb5.internal.Config"); |
|
|
|
|
} else { |
|
|
|
|
configClassRef = Class.forName("sun.security.krb5.Config"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Method getInstanceMethod = configClassRef.getMethod("getInstance"); |
|
|
|
|
Object kerbConf = getInstanceMethod.invoke(configClassRef); |
|
|
|
|
Method refreshMethod = configClassRef.getDeclaredMethod("refresh"); |
|
|
|
|
refreshMethod.invoke(kerbConf); |
|
|
|
|
} |
|
|
|
|
} |