@ -24,17 +24,15 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.fs.FSDataInputStream ;
import org.apache.hadoop.fs.* ;
import org.apache.hadoop.fs.FileSystem ;
import org.apache.hadoop.fs.FileUtil ;
import org.apache.hadoop.fs.Path ;
import org.apache.hadoop.fs.FileStatus ;
import org.apache.hadoop.security.UserGroupInformation ;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.io.* ;
import java.security.PrivilegedExceptionAction ;
import java.util.List ;
import java.util.Map ;
import java.util.stream.Collectors ;
@ -51,18 +49,41 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory . getLogger ( HadoopUtils . class ) ;
private static String hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
private static volatile HadoopUtils instance = new HadoopUtils ( ) ;
private static volatile Configuration configuration ;
private static FileSystem fs ;
private HadoopUtils ( ) {
if ( StringUtils . isEmpty ( hdfsUser ) ) {
hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
}
init ( ) ;
initHdfsPath ( ) ;
}
public static HadoopUtils getInstance ( ) {
return instance ;
}
/ * *
* init escheduler root path in hdfs
* /
private void initHdfsPath ( ) {
String hdfsPath = getString ( Constants . DATA_STORE_2_HDFS_BASEPATH ) ;
Path path = new Path ( hdfsPath ) ;
try {
if ( ! fs . exists ( path ) ) {
fs . mkdirs ( path ) ;
}
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
}
/ * *
* init hadoop configuration
* /
@ -100,7 +121,20 @@ public class HadoopUtils implements Closeable {
}
if ( fs = = null ) {
fs = FileSystem . get ( configuration ) ;
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
//UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser());
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
@Override
public Boolean run ( ) throws Exception {
fs = FileSystem . get ( configuration ) ;
return true ;
}
} ) ;
} else {
logger . warn ( "hdfs.root.user is not set value!" ) ;
fs = FileSystem . get ( configuration ) ;
}
}
String rmHaIds = getString ( YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
String appAddress = getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;