Browse Source

Update HadoopUtils.java

optimize HadoopUtils
pull/2/head
dailidong 5 years ago committed by GitHub
parent
commit
b85b59375e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 282
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

282
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -16,6 +16,9 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
@ -23,6 +26,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -32,9 +36,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -52,29 +59,39 @@ public class HadoopUtils implements Closeable {
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
private static volatile HadoopUtils instance = new HadoopUtils();
private static volatile Configuration configuration; private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
.build(new CacheLoader<String, HadoopUtils>() {
@Override
public HadoopUtils load(String key) throws Exception {
return new HadoopUtils();
}
});
private static volatile boolean yarnEnabled = false; private static volatile boolean yarnEnabled = false;
private static FileSystem fs;
private Configuration configuration;
private FileSystem fs;
private HadoopUtils(){ private HadoopUtils() {
init(); init();
initHdfsPath(); initHdfsPath();
} }
public static HadoopUtils getInstance(){ public static HadoopUtils getInstance() {
// if kerberos startup , renew HadoopUtils
if (CommonUtils.getKerberosStartupState()){ return cache.getUnchecked(HADOOP_UTILS_KEY);
return new HadoopUtils();
}
return instance;
} }
/** /**
* init dolphinscheduler root path in hdfs * init dolphinscheduler root path in hdfs
*/ */
private void initHdfsPath(){
private void initHdfsPath() {
Path path = new Path(resourceUploadPath); Path path = new Path(resourceUploadPath);
try { try {
@ -82,7 +99,7 @@ public class HadoopUtils implements Closeable {
fs.mkdirs(path); fs.mkdirs(path);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(), e);
} }
} }
@ -91,76 +108,68 @@ public class HadoopUtils implements Closeable {
* init hadoop configuration * init hadoop configuration
*/ */
private void init() { private void init() {
if (configuration == null) { try {
synchronized (HadoopUtils.class) { configuration = new Configuration();
if (configuration == null) {
try { String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
configuration = new Configuration(); ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); if (resUploadType == ResUploadType.HDFS){
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType); if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
if (resUploadType == ResUploadType.HDFS){ PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){ configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, hdfsUser = "";
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); UserGroupInformation.setConfiguration(configuration);
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
hdfsUser = ""; PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
UserGroupInformation.setConfiguration(configuration); }
UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
}
String defaultFS = configuration.get(Constants.FS_DEFAULTFS); String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system // the default is the local file system
if(defaultFS.startsWith("file")){ if (defaultFS.startsWith("file")) {
String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS); String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
if(StringUtils.isNotBlank(defaultFSProp)){ if (StringUtils.isNotBlank(defaultFSProp)) {
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs."); Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
configuration.set(Constants.FS_DEFAULTFS,defaultFSProp); configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{ } else {
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS ); logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
throw new RuntimeException( throw new RuntimeException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS) String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
); );
} }
}else{ } else {
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
} }
if (fs == null) { if (fs == null) {
if(StringUtils.isNotEmpty(hdfsUser)){ if (StringUtils.isNotEmpty(hdfsUser)) {
//UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser()); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser); ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
ugi.doAs(new PrivilegedExceptionAction<Boolean>() { @Override
@Override public Boolean run() throws Exception {
public Boolean run() throws Exception { fs = FileSystem.get(configuration);
fs = FileSystem.get(configuration); return true;
return true;
}
});
}else{
logger.warn("hdfs.root.user is not set value!");
fs = FileSystem.get(configuration);
}
} }
}else if (resUploadType == ResUploadType.S3){ });
configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS)); } else {
configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT)); logger.warn("hdfs.root.user is not set value!");
configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY)); fs = FileSystem.get(configuration);
configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
fs = FileSystem.get(configuration);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} }
} }
} else if (resUploadType == ResUploadType.S3) {
configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
fs = FileSystem.get(configuration);
} }
} catch (Exception e) {
logger.error(e.getMessage(), e);
} }
} }
@ -206,15 +215,15 @@ public class HadoopUtils implements Closeable {
/** /**
* cat file on hdfs * cat file on hdfs
* *
* @param hdfsFilePath hdfs file path * @param hdfsFilePath hdfs file path
* @return byte[] byte array * @return byte[] byte array
* @throws IOException errors * @throws IOException errors
*/ */
public byte[] catFile(String hdfsFilePath) throws IOException { public byte[] catFile(String hdfsFilePath) throws IOException {
if(StringUtils.isBlank(hdfsFilePath)){ if (StringUtils.isBlank(hdfsFilePath)) {
logger.error("hdfs file path:{} is blank",hdfsFilePath); logger.error("hdfs file path:{} is blank", hdfsFilePath);
return null; return new byte[0];
} }
FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath)); FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath));
@ -222,29 +231,28 @@ public class HadoopUtils implements Closeable {
} }
/** /**
* cat file on hdfs * cat file on hdfs
* *
* @param hdfsFilePath hdfs file path * @param hdfsFilePath hdfs file path
* @param skipLineNums skip line numbers * @param skipLineNums skip line numbers
* @param limit read how many lines * @param limit read how many lines
* @return content of file * @return content of file
* @throws IOException errors * @throws IOException errors
*/ */
public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException { public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {
if (StringUtils.isBlank(hdfsFilePath)){ if (StringUtils.isBlank(hdfsFilePath)) {
logger.error("hdfs file path:{} is blank",hdfsFilePath); logger.error("hdfs file path:{} is blank", hdfsFilePath);
return null; return Collections.emptyList();
} }
try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){ try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
BufferedReader br = new BufferedReader(new InputStreamReader(in)); BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit); Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList()); return stream.collect(Collectors.toList());
} }
} }
/** /**
@ -277,17 +285,17 @@ public class HadoopUtils implements Closeable {
/** /**
* the src file is on the local disk. Add it to FS at * the src file is on the local disk. Add it to FS at
* the given dst name. * the given dst name.
*
* @param srcFile local file * @param srcFile local file
* @param dstHdfsPath destination hdfs path * @param dstHdfsPath destination hdfs path
* @param deleteSource whether to delete the src * @param deleteSource whether to delete the src
* @param overwrite whether to overwrite an existing file * @param overwrite whether to overwrite an existing file
* @return if success or not * @return if success or not
* @throws IOException errors * @throws IOException errors
*/ */
public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException { public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
Path srcPath = new Path(srcFile); Path srcPath = new Path(srcFile);
Path dstPath= new Path(dstHdfsPath); Path dstPath = new Path(dstHdfsPath);
fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath); fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);
@ -297,10 +305,10 @@ public class HadoopUtils implements Closeable {
/** /**
* copy hdfs file to local * copy hdfs file to local
* *
* @param srcHdfsFilePath source hdfs file path * @param srcHdfsFilePath source hdfs file path
* @param dstFile destination file * @param dstFile destination file
* @param deleteSource delete source * @param deleteSource delete source
* @param overwrite overwrite * @param overwrite overwrite
* @return result of copy hdfs file to local * @return result of copy hdfs file to local
* @throws IOException errors * @throws IOException errors
*/ */
@ -311,14 +319,14 @@ public class HadoopUtils implements Closeable {
if (dstPath.exists()) { if (dstPath.exists()) {
if (dstPath.isFile()) { if (dstPath.isFile()) {
if (overwrite) { if (overwrite) {
dstPath.delete(); Files.delete(dstPath.toPath());
} }
} else { } else {
logger.error("destination file must be a file"); logger.error("destination file must be a file");
} }
} }
if(!dstPath.getParentFile().exists()){ if (!dstPath.getParentFile().exists()) {
dstPath.getParentFile().mkdirs(); dstPath.getParentFile().mkdirs();
} }
@ -326,14 +334,13 @@ public class HadoopUtils implements Closeable {
} }
/** /**
*
* delete a file * delete a file
* *
* @param hdfsFilePath the path to delete. * @param hdfsFilePath the path to delete.
* @param recursive if path is a directory and set to * @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In * true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false. * case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false. * @return true if delete is successful else false.
* @throws IOException errors * @throws IOException errors
*/ */
public boolean delete(String hdfsFilePath, boolean recursive) throws IOException { public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
@ -358,7 +365,7 @@ public class HadoopUtils implements Closeable {
* @return {@link FileStatus} file status * @return {@link FileStatus} file status
* @throws Exception errors * @throws Exception errors
*/ */
public FileStatus[] listFileStatus(String filePath)throws Exception{ public FileStatus[] listFileStatus(String filePath) throws Exception {
try { try {
return fs.listStatus(new Path(filePath)); return fs.listStatus(new Path(filePath));
} catch (IOException e) { } catch (IOException e) {
@ -370,10 +377,11 @@ public class HadoopUtils implements Closeable {
/** /**
* Renames Path src to Path dst. Can take place on local fs * Renames Path src to Path dst. Can take place on local fs
* or remote DFS. * or remote DFS.
*
* @param src path to be renamed * @param src path to be renamed
* @param dst new path after rename * @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful * @return true if rename is successful
* @throws IOException on failure
*/ */
public boolean rename(String src, String dst) throws IOException { public boolean rename(String src, String dst) throws IOException {
return fs.rename(new Path(src), new Path(dst)); return fs.rename(new Path(src), new Path(dst));
@ -403,7 +411,7 @@ public class HadoopUtils implements Closeable {
String responseContent = HttpUtils.get(applicationUrl); String responseContent = HttpUtils.get(applicationUrl);
JSONObject jsonObject = JSONObject.parseObject(responseContent); JSONObject jsonObject = JSON.parseObject(responseContent);
String result = jsonObject.getJSONObject("app").getString("finalStatus"); String result = jsonObject.getJSONObject("app").getString("finalStatus");
switch (result) { switch (result) {
@ -438,6 +446,22 @@ public class HadoopUtils implements Closeable {
} }
} }
/**
* hdfs resource dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsDir = getHdfsUdfDir(tenantCode);
}
return hdfsDir;
}
/** /**
* hdfs resource dir * hdfs resource dir
* *
@ -452,11 +476,11 @@ public class HadoopUtils implements Closeable {
* hdfs user dir * hdfs user dir
* *
* @param tenantCode tenant code * @param tenantCode tenant code
* @param userId user id * @param userId user id
* @return hdfs resource dir * @return hdfs resource dir
*/ */
public static String getHdfsUserDir(String tenantCode,int userId) { public static String getHdfsUserDir(String tenantCode, int userId) {
return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId); return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
} }
/** /**
@ -469,26 +493,39 @@ public class HadoopUtils implements Closeable {
return String.format("%s/udfs", getHdfsTenantDir(tenantCode)); return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
} }
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
}
/** /**
* get absolute path and name for file on hdfs * get absolute path and name for resource file on hdfs
* *
* @param tenantCode tenant code * @param tenantCode tenant code
* @param filename file name * @param fileName file name
* @return get absolute path and name for file on hdfs * @return get absolute path and name for file on hdfs
*/ */
public static String getHdfsFilename(String tenantCode, String filename) { public static String getHdfsResourceFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsResDir(tenantCode), filename); return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
} }
/** /**
* get absolute path and name for udf file on hdfs * get absolute path and name for udf file on hdfs
* *
* @param tenantCode tenant code * @param tenantCode tenant code
* @param filename file name * @param fileName file name
* @return get absolute path and name for udf file on hdfs * @return get absolute path and name for udf file on hdfs
*/ */
public static String getHdfsUdfFilename(String tenantCode, String filename) { public static String getHdfsUdfFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename); return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
} }
/** /**
@ -504,7 +541,7 @@ public class HadoopUtils implements Closeable {
* getAppAddress * getAppAddress
* *
* @param appAddress app address * @param appAddress app address
* @param rmHa resource manager ha * @param rmHa resource manager ha
* @return app address * @return app address
*/ */
public static String getAppAddress(String appAddress, String rmHa) { public static String getAppAddress(String appAddress, String rmHa) {
@ -549,8 +586,6 @@ public class HadoopUtils implements Closeable {
*/ */
private static final class YarnHAAdminUtils extends RMAdminCLI { private static final class YarnHAAdminUtils extends RMAdminCLI {
private static final Logger logger = LoggerFactory.getLogger(YarnHAAdminUtils.class);
/** /**
* get active resourcemanager * get active resourcemanager
* *
@ -609,8 +644,7 @@ public class HadoopUtils implements Closeable {
JSONObject jsonObject = JSON.parseObject(retStr); JSONObject jsonObject = JSON.parseObject(retStr);
//get ResourceManager state //get ResourceManager state
String state = jsonObject.getJSONObject("clusterInfo").getString("haState"); return jsonObject.getJSONObject("clusterInfo").getString("haState");
return state;
} }
} }

Loading…
Cancel
Save