Browse Source

Cache HadoopUtils instance with specific days expire time (#2181)

* Cache HadoopUtils instance with 7 days expire time

* solve sonar issue

* add kerberos expire time config

* move KERBEROS_EXPIRE_TIME to Constants.java
pull/3/MERGE
tswstarplanet 5 years ago committed by gaojun2048
parent
commit
77c4bcb96e
  1. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 96
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  3. 2
      dolphinscheduler-common/src/main/resources/common.properties

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -813,6 +813,11 @@ public final class Constants {
*/ */
public static final String KERBEROS = "kerberos"; public static final String KERBEROS = "kerberos";
/**
* kerberos expire time
*/
public static final String KERBEROS_EXPIRE_TIME = "kerberos.expire.time";
/** /**
* java.security.krb5.conf * java.security.krb5.conf
*/ */

96
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -16,6 +16,9 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
@ -37,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -48,30 +52,37 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
private static HadoopUtils instance = new HadoopUtils(); private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
.build(new CacheLoader<String, HadoopUtils>() {
@Override
public HadoopUtils load(String key) throws Exception {
return new HadoopUtils();
}
});
private static Configuration configuration; private static Configuration configuration;
private static FileSystem fs; private static FileSystem fs;
private String hdfsUser; private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
private HadoopUtils(){ private HadoopUtils() {
hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
init(); init();
initHdfsPath(); initHdfsPath();
} }
public static HadoopUtils getInstance(){ public static HadoopUtils getInstance() {
// if kerberos startup , renew HadoopUtils
if (CommonUtils.getKerberosStartupState()){ return cache.getUnchecked(HADOOP_UTILS_KEY);
return new HadoopUtils();
}
return instance;
} }
/** /**
* init dolphinscheduler root path in hdfs * init dolphinscheduler root path in hdfs
*/ */
private void initHdfsPath(){ private void initHdfsPath() {
String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH); String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH);
Path path = new Path(hdfsPath); Path path = new Path(hdfsPath);
@ -80,7 +91,7 @@ public class HadoopUtils implements Closeable {
fs.mkdirs(path); fs.mkdirs(path);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(), e);
} }
} }
@ -88,21 +99,18 @@ public class HadoopUtils implements Closeable {
/** /**
* init hadoop configuration * init hadoop configuration
*/ */
private void init() { private static void init() {
if (configuration == null) {
synchronized (HadoopUtils.class) {
if (configuration == null) {
try { try {
configuration = new Configuration(); configuration = new Configuration();
String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
if (resUploadType == ResUploadType.HDFS){ if (resUploadType == ResUploadType.HDFS) {
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){ if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)) {
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration); UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME), UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH)); PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
@ -111,24 +119,24 @@ public class HadoopUtils implements Closeable {
String defaultFS = configuration.get(Constants.FS_DEFAULTFS); String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system // the default is the local file system
if(defaultFS.startsWith("file")){ if (defaultFS.startsWith("file")) {
String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS); String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
if(StringUtils.isNotBlank(defaultFSProp)){ if (StringUtils.isNotBlank(defaultFSProp)) {
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs."); Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
configuration.set(Constants.FS_DEFAULTFS,defaultFSProp); configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{ } else {
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS ); logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
throw new RuntimeException( throw new RuntimeException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS) String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
); );
} }
}else{ } else {
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
} }
if (fs == null) { if (fs == null) {
if(StringUtils.isNotEmpty(hdfsUser)){ if (StringUtils.isNotEmpty(hdfsUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
ugi.doAs(new PrivilegedExceptionAction<Boolean>() { ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override @Override
@ -137,12 +145,12 @@ public class HadoopUtils implements Closeable {
return true; return true;
} }
}); });
}else{ } else {
logger.warn("hdfs.root.user is not set value!"); logger.warn("hdfs.root.user is not set value!");
fs = FileSystem.get(configuration); fs = FileSystem.get(configuration);
} }
} }
}else if (resUploadType == ResUploadType.S3){ } else if (resUploadType == ResUploadType.S3) {
configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS)); configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT)); configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY)); configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
@ -161,10 +169,6 @@ public class HadoopUtils implements Closeable {
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
}
}
}
} }
/** /**
@ -193,8 +197,8 @@ public class HadoopUtils implements Closeable {
*/ */
public byte[] catFile(String hdfsFilePath) throws IOException { public byte[] catFile(String hdfsFilePath) throws IOException {
if(StringUtils.isBlank(hdfsFilePath)){ if (StringUtils.isBlank(hdfsFilePath)) {
logger.error("hdfs file path:{} is blank",hdfsFilePath); logger.error("hdfs file path:{} is blank", hdfsFilePath);
return new byte[0]; return new byte[0];
} }
@ -203,7 +207,6 @@ public class HadoopUtils implements Closeable {
} }
/** /**
* cat file on hdfs * cat file on hdfs
* *
@ -215,12 +218,12 @@ public class HadoopUtils implements Closeable {
*/ */
public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException { public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {
if (StringUtils.isBlank(hdfsFilePath)){ if (StringUtils.isBlank(hdfsFilePath)) {
logger.error("hdfs file path:{} is blank",hdfsFilePath); logger.error("hdfs file path:{} is blank", hdfsFilePath);
return Collections.emptyList(); return Collections.emptyList();
} }
try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){ try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
BufferedReader br = new BufferedReader(new InputStreamReader(in)); BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit); Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList()); return stream.collect(Collectors.toList());
@ -258,7 +261,7 @@ public class HadoopUtils implements Closeable {
/** /**
* the src file is on the local disk. Add it to FS at * the src file is on the local disk. Add it to FS at
* the given dst name. * the given dst name.
*
* @param srcFile local file * @param srcFile local file
* @param dstHdfsPath destination hdfs path * @param dstHdfsPath destination hdfs path
* @param deleteSource whether to delete the src * @param deleteSource whether to delete the src
@ -268,7 +271,7 @@ public class HadoopUtils implements Closeable {
*/ */
public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException { public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
Path srcPath = new Path(srcFile); Path srcPath = new Path(srcFile);
Path dstPath= new Path(dstHdfsPath); Path dstPath = new Path(dstHdfsPath);
fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath); fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);
@ -299,7 +302,7 @@ public class HadoopUtils implements Closeable {
} }
} }
if(!dstPath.getParentFile().exists()){ if (!dstPath.getParentFile().exists()) {
dstPath.getParentFile().mkdirs(); dstPath.getParentFile().mkdirs();
} }
@ -307,7 +310,6 @@ public class HadoopUtils implements Closeable {
} }
/** /**
*
* delete a file * delete a file
* *
* @param hdfsFilePath the path to delete. * @param hdfsFilePath the path to delete.
@ -339,7 +341,7 @@ public class HadoopUtils implements Closeable {
* @return {@link FileStatus} file status * @return {@link FileStatus} file status
* @throws Exception errors * @throws Exception errors
*/ */
public FileStatus[] listFileStatus(String filePath)throws Exception{ public FileStatus[] listFileStatus(String filePath) throws Exception {
try { try {
return fs.listStatus(new Path(filePath)); return fs.listStatus(new Path(filePath));
} catch (IOException e) { } catch (IOException e) {
@ -351,10 +353,11 @@ public class HadoopUtils implements Closeable {
/** /**
* Renames Path src to Path dst. Can take place on local fs * Renames Path src to Path dst. Can take place on local fs
* or remote DFS. * or remote DFS.
*
* @param src path to be renamed * @param src path to be renamed
* @param dst new path after rename * @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful * @return true if rename is successful
* @throws IOException on failure
*/ */
public boolean rename(String src, String dst) throws IOException { public boolean rename(String src, String dst) throws IOException {
return fs.rename(new Path(src), new Path(dst)); return fs.rename(new Path(src), new Path(dst));
@ -400,7 +403,6 @@ public class HadoopUtils implements Closeable {
} }
/** /**
*
* @return data hdfs path * @return data hdfs path
*/ */
public static String getHdfsDataBasePath() { public static String getHdfsDataBasePath() {
@ -430,8 +432,8 @@ public class HadoopUtils implements Closeable {
* @param userId user id * @param userId user id
* @return hdfs resource dir * @return hdfs resource dir
*/ */
public static String getHdfsUserDir(String tenantCode,int userId) { public static String getHdfsUserDir(String tenantCode, int userId) {
return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId); return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
} }
/** /**

2
dolphinscheduler-common/src/main/resources/common.properties

@ -91,4 +91,4 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
kerberos.expire.time=7

Loading…
Cancel
Save