|
|
|
@ -24,19 +24,14 @@ import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
import org.apache.commons.io.IOUtils; |
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
import org.apache.hadoop.conf.Configuration; |
|
|
|
|
import org.apache.hadoop.fs.FSDataInputStream; |
|
|
|
|
import org.apache.hadoop.fs.*; |
|
|
|
|
import org.apache.hadoop.fs.FileSystem; |
|
|
|
|
import org.apache.hadoop.fs.FileUtil; |
|
|
|
|
import org.apache.hadoop.fs.Path; |
|
|
|
|
import org.apache.hadoop.fs.FileStatus; |
|
|
|
|
import org.apache.hadoop.security.UserGroupInformation; |
|
|
|
|
import org.apache.hadoop.yarn.client.cli.RMAdminCLI; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.io.*; |
|
|
|
|
import java.security.PrivilegedAction; |
|
|
|
|
import java.security.PrivilegedActionException; |
|
|
|
|
import java.security.PrivilegedExceptionAction; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
@ -54,12 +49,18 @@ public class HadoopUtils implements Closeable {
|
|
|
|
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); |
|
|
|
|
|
|
|
|
|
private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER); |
|
|
|
|
private static volatile HadoopUtils instance = new HadoopUtils(); |
|
|
|
|
private static volatile Configuration configuration; |
|
|
|
|
private static FileSystem fs; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private HadoopUtils(){ |
|
|
|
|
if(StringUtils.isEmpty(hdfsUser)){ |
|
|
|
|
hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER); |
|
|
|
|
} |
|
|
|
|
init(); |
|
|
|
|
initHdfsPath(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static HadoopUtils getInstance(){ |
|
|
|
@ -70,26 +71,19 @@ public class HadoopUtils implements Closeable {
|
|
|
|
|
* init escheduler root path in hdfs |
|
|
|
|
*/ |
|
|
|
|
private void initHdfsPath(){ |
|
|
|
|
String hdfsUser = getString(Constants.HDFS_ROOT_USER); |
|
|
|
|
String hdfsPath = getString(Constants.DATA_STORE_2_HDFS_BASEPATH); |
|
|
|
|
Path path = new Path(hdfsPath); |
|
|
|
|
try { |
|
|
|
|
UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser()); |
|
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Boolean>() { |
|
|
|
|
@Override |
|
|
|
|
public Boolean run() throws Exception { |
|
|
|
|
|
|
|
|
|
if(!fs.exists(path)){ |
|
|
|
|
return fs.mkdirs(path); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
try { |
|
|
|
|
if (!fs.exists(path)) { |
|
|
|
|
fs.mkdirs(path); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error(e.getMessage(),e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* init hadoop configuration |
|
|
|
|
*/ |
|
|
|
@ -127,7 +121,20 @@ public class HadoopUtils implements Closeable {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (fs == null) { |
|
|
|
|
fs = FileSystem.get(configuration); |
|
|
|
|
if(StringUtils.isNotEmpty(hdfsUser)){ |
|
|
|
|
//UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser());
|
|
|
|
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser); |
|
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Boolean>() { |
|
|
|
|
@Override |
|
|
|
|
public Boolean run() throws Exception { |
|
|
|
|
fs = FileSystem.get(configuration); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
}else{ |
|
|
|
|
logger.warn("hdfs.root.user is not set value!"); |
|
|
|
|
fs = FileSystem.get(configuration); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
String rmHaIds = getString(YARN_RESOURCEMANAGER_HA_RM_IDS); |
|
|
|
|
String appAddress = getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); |
|
|
|
|