|
|
|
@ -4,11 +4,14 @@ import com.fr.io.base.provider.impl.ConfigRepositoryFactory;
|
|
|
|
|
import com.fr.io.context.info.RepositoryProfile; |
|
|
|
|
import com.fr.io.repository.ResourceRepository; |
|
|
|
|
import com.fr.log.FineLoggerFactory; |
|
|
|
|
import com.fr.plugin.hdfs.repository.HDFSFactoryProvider; |
|
|
|
|
import com.fr.stable.StringUtils; |
|
|
|
|
import org.apache.hadoop.conf.Configuration; |
|
|
|
|
import org.apache.hadoop.fs.FileSystem; |
|
|
|
|
import org.apache.hadoop.fs.Path; |
|
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem; |
|
|
|
|
import org.apache.hadoop.security.AnnotatedSecurityInfo; |
|
|
|
|
import org.apache.hadoop.security.SecurityUtil; |
|
|
|
|
import org.apache.hadoop.security.UserGroupInformation; |
|
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
@ -23,6 +26,7 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
private static final String HDFS_SCHEMA = "hdfs://"; |
|
|
|
|
private static final String DEFAULT_HOST = "localhost"; |
|
|
|
|
private static final String DEFAULT_PORT = "9000"; |
|
|
|
|
private boolean kerberosAuthModeSet = false; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public HDFSRepositoryFactory() { |
|
|
|
@ -46,7 +50,7 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
fileSystem = createFileSystem(config); |
|
|
|
|
return fileSystem.exists(new Path("/")); |
|
|
|
|
} catch (IOException e) { |
|
|
|
|
FineLoggerFactory.getLogger().error(e.getMessage()); |
|
|
|
|
FineLoggerFactory.getLogger().error(e.getMessage(), e); |
|
|
|
|
} finally { |
|
|
|
|
if (fileSystem != null) { |
|
|
|
|
try { |
|
|
|
@ -68,25 +72,41 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
private FileSystem createFileSystem(HDFSConfig config) { |
|
|
|
|
Configuration conf = createConfiguration(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//根据配置生成url
|
|
|
|
|
String host = StringUtils.isEmpty(config.getHost()) ? DEFAULT_HOST : config.getHost(); |
|
|
|
|
String port = config.getPort() == 0 ? DEFAULT_PORT : String.valueOf(config.getPort()); |
|
|
|
|
|
|
|
|
|
String hdfsUrl = HDFS_SCHEMA.concat(host).concat(":").concat(port); |
|
|
|
|
String principal = config.getPrincipal(); |
|
|
|
|
String krb5Conf = config.getKrbConf(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//开启了kerberos验证
|
|
|
|
|
if (kerberosAuthenticated(config)) { |
|
|
|
|
boolean kerberos = kerberosAuthenticated(config); |
|
|
|
|
if (kerberos) { |
|
|
|
|
try { |
|
|
|
|
UserGroupInformation.setConfiguration(conf);//UserGroupInformation初始化
|
|
|
|
|
System.setProperty("java.security.krb5.conf", krb5Conf); |
|
|
|
|
conf.set("hadoop.security.authentication", "kerberos"); |
|
|
|
|
|
|
|
|
|
//类似OSGI下,类加载需要设置SecurityUtil.setSecurityInfoProviders(new AnnotatedSecurityInfo());
|
|
|
|
|
//refer to https://stackoverflow.com/questions/37608049/how-to-connect-with-hdfs-via-kerberos-from-osgi-bundles
|
|
|
|
|
SecurityUtil.setSecurityInfoProviders(new AnnotatedSecurityInfo()); |
|
|
|
|
//UserGroupInformation初始化
|
|
|
|
|
UserGroupInformation.setConfiguration(conf); |
|
|
|
|
UserGroupInformation.loginUserFromKeytab(config.getPrincipal(), config.getKeyTab()); |
|
|
|
|
} catch (IOException e) { |
|
|
|
|
FineLoggerFactory.getLogger().error(e.getMessage()); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
FineLoggerFactory.getLogger().error(e.getMessage(), e); |
|
|
|
|
} finally { |
|
|
|
|
kerberosAuthModeSet = true; |
|
|
|
|
} |
|
|
|
|
} else if (kerberosAuthModeSet){ |
|
|
|
|
conf.set("hadoop.security.authorization", "false"); |
|
|
|
|
conf.set("hadoop.security.authentication", "simple"); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
if (StringUtils.isNotEmpty(principal)) { |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotEmpty(principal) && !kerberos) { |
|
|
|
|
return FileSystem.newInstance(URI.create(hdfsUrl), conf, principal); |
|
|
|
|
} else { |
|
|
|
|
return FileSystem.newInstance(URI.create(hdfsUrl), conf); |
|
|
|
@ -94,6 +114,7 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
FineLoggerFactory.getLogger().error(e.getMessage(), e); |
|
|
|
|
throw new RuntimeException(e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -105,21 +126,22 @@ public class HDFSRepositoryFactory extends ConfigRepositoryFactory<HDFSConfig> {
|
|
|
|
|
* @return 配置 |
|
|
|
|
*/ |
|
|
|
|
private Configuration createConfiguration() { |
|
|
|
|
final Configuration conf = new Configuration(); |
|
|
|
|
Configuration conf = new Configuration(); |
|
|
|
|
|
|
|
|
|
//使用GeneralUtils才能加载到FileSystem实现类
|
|
|
|
|
conf.setClassLoader(HDFSFactoryProvider.class.getClassLoader()); |
|
|
|
|
|
|
|
|
|
//设置hdfs协议实现
|
|
|
|
|
conf.setClass("fs.hdfs.impl", DistributedFileSystem.class, FileSystem.class); |
|
|
|
|
|
|
|
|
|
//使用GeneralUtils才能加载到FileSystem实现类
|
|
|
|
|
conf.setClassLoader(this.getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
conf.set("ipc.client.fallback-to-simple-auth-allowed", "true"); |
|
|
|
|
|
|
|
|
|
return conf; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private boolean kerberosAuthenticated(HDFSConfig config) { |
|
|
|
|
return StringUtils.isNotEmpty(config.getKeyTab()) && StringUtils.isNotEmpty(config.getPrincipal()); |
|
|
|
|
return StringUtils.isNotEmpty(config.getKeyTab()) |
|
|
|
|
&& StringUtils.isNotEmpty(config.getPrincipal()) |
|
|
|
|
&& StringUtils.isNotEmpty(config.getKrbConf()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|