@ -16,6 +16,9 @@
* /
package org.apache.dolphinscheduler.common.utils ;
import com.google.common.cache.CacheBuilder ;
import com.google.common.cache.CacheLoader ;
import com.google.common.cache.LoadingCache ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ResUploadType ;
@ -32,9 +35,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory ;
import java.io.* ;
import java.nio.file.Files ;
import java.security.PrivilegedExceptionAction ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@ -46,32 +52,37 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory . getLogger ( HadoopUtils . class ) ;
private static String hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
private static volatile HadoopUtils instance = new HadoopUtils ( ) ;
private static volatile Configuration configuration ;
private static FileSystem fs ;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY" ;
private static final LoadingCache < String , HadoopUtils > cache = CacheBuilder
. newBuilder ( )
. expireAfterWrite ( PropertyUtils . getInt ( Constants . KERBEROS_EXPIRE_TIME , 7 ) , TimeUnit . DAYS )
. build ( new CacheLoader < String , HadoopUtils > ( ) {
@Override
public HadoopUtils load ( String key ) throws Exception {
return new HadoopUtils ( ) ;
}
} ) ;
private HadoopUtils ( ) {
if ( StringUtils . isEmpty ( hdfsUser ) ) {
hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
}
private Configuration configuration ;
private FileSystem fs ;
private static String hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
private HadoopUtils ( ) {
init ( ) ;
initHdfsPath ( ) ;
}
public static HadoopUtils getInstance ( ) {
// if kerberos startup , renew HadoopUtils
if ( CommonUtils . getKerberosStartupState ( ) ) {
return new HadoopUtils ( ) ;
}
return instance ;
public static HadoopUtils getInstance ( ) {
return cache . getUnchecked ( HADOOP_UTILS_KEY ) ;
}
/ * *
* init dolphinscheduler root path in hdfs
* /
private void initHdfsPath ( ) {
private void initHdfsPath ( ) {
String hdfsPath = PropertyUtils . getString ( Constants . DATA_STORE_2_HDFS_BASEPATH ) ;
Path path = new Path ( hdfsPath ) ;
@ -80,7 +91,7 @@ public class HadoopUtils implements Closeable {
fs . mkdirs ( path ) ;
}
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
logger . error ( e . getMessage ( ) , e ) ;
}
}
@ -89,82 +100,74 @@ public class HadoopUtils implements Closeable {
* init hadoop configuration
* /
private void init ( ) {
if ( configuration = = null ) {
synchronized ( HadoopUtils . class ) {
if ( configuration = = null ) {
try {
configuration = new Configuration ( ) ;
String resUploadStartupType = PropertyUtils . getString ( Constants . RES_UPLOAD_STARTUP_TYPE ) ;
ResUploadType resUploadType = ResUploadType . valueOf ( resUploadStartupType ) ;
if ( resUploadType = = ResUploadType . HDFS ) {
if ( PropertyUtils . getBoolean ( Constants . HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE ) ) {
System . setProperty ( Constants . JAVA_SECURITY_KRB5_CONF ,
PropertyUtils . getString ( Constants . JAVA_SECURITY_KRB5_CONF_PATH ) ) ;
configuration . set ( Constants . HADOOP_SECURITY_AUTHENTICATION , "kerberos" ) ;
UserGroupInformation . setConfiguration ( configuration ) ;
UserGroupInformation . loginUserFromKeytab ( PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_USERNAME ) ,
PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_PATH ) ) ;
}
try {
configuration = new Configuration ( ) ;
String resUploadStartupType = PropertyUtils . getString ( Constants . RES_UPLOAD_STARTUP_TYPE ) ;
ResUploadType resUploadType = ResUploadType . valueOf ( resUploadStartupType ) ;
if ( resUploadType = = ResUploadType . HDFS ) {
if ( PropertyUtils . getBoolean ( Constants . HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE ) ) {
System . setProperty ( Constants . JAVA_SECURITY_KRB5_CONF ,
PropertyUtils . getString ( Constants . JAVA_SECURITY_KRB5_CONF_PATH ) ) ;
configuration . set ( Constants . HADOOP_SECURITY_AUTHENTICATION , "kerberos" ) ;
UserGroupInformation . setConfiguration ( configuration ) ;
UserGroupInformation . loginUserFromKeytab ( PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_USERNAME ) ,
PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_PATH ) ) ;
}
String defaultFS = configuration . get ( Constants . FS_DEFAULTFS ) ;
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system
if ( defaultFS . startsWith ( "file" ) ) {
String defaultFSProp = PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ;
if ( StringUtils . isNotBlank ( defaultFSProp ) ) {
Map < String , String > fsRelatedProps = PropertyUtils . getPrefixedProperties ( "fs." ) ;
configuration . set ( Constants . FS_DEFAULTFS , defaultFSProp ) ;
fsRelatedProps . forEach ( ( key , value ) - > configuration . set ( key , value ) ) ;
} else {
logger . error ( "property:{} can not to be empty, please set!" , Constants . FS_DEFAULTFS ) ;
throw new RuntimeException (
String . format ( "property: %s can not to be empty, please set!" , Constants . FS_DEFAULTFS )
) ;
}
} else {
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
}
String defaultFS = configuration . get ( Constants . FS_DEFAULTFS ) ;
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system
if ( defaultFS . startsWith ( "file" ) ) {
String defaultFSProp = PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ;
if ( StringUtils . isNotBlank ( defaultFSProp ) ) {
Map < String , String > fsRelatedProps = PropertyUtils . getPrefixedProperties ( "fs." ) ;
configuration . set ( Constants . FS_DEFAULTFS , defaultFSProp ) ;
fsRelatedProps . forEach ( ( key , value ) - > configuration . set ( key , value ) ) ;
} else {
logger . error ( "property:{} can not to be empty, please set!" , Constants . FS_DEFAULTFS ) ;
throw new RuntimeException (
String . format ( "property: %s can not to be empty, please set!" , Constants . FS_DEFAULTFS )
) ;
}
} else {
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
}
if ( fs = = null ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
//UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser());
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
@Override
public Boolean run ( ) throws Exception {
fs = FileSystem . get ( configuration ) ;
return true ;
}
} ) ;
} else {
logger . warn ( "hdfs.root.user is not set value!" ) ;
fs = FileSystem . get ( configuration ) ;
}
if ( fs = = null ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
@Override
public Boolean run ( ) throws Exception {
fs = FileSystem . get ( configuration ) ;
return true ;
}
} else if ( resUploadType = = ResUploadType . S3 ) {
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
configuration . set ( Constants . FS_S3A_SECRET_KEY , PropertyUtils . getString ( Constants . FS_S3A_SECRET_KEY ) ) ;
fs = FileSystem . get ( configuration ) ;
}
String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
if ( ! StringUtils . isEmpty ( rmHaIds ) ) {
appAddress = getAppAddress ( appAddress , rmHaIds ) ;
logger . info ( "appAddress : {}" , appAddress ) ;
}
configuration . set ( Constants . YARN_APPLICATION_STATUS_ADDRESS , appAddress ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
} ) ;
} else {
logger . warn ( "hdfs.root.user is not set value!" ) ;
fs = FileSystem . get ( configuration ) ;
}
}
} else if ( resUploadType = = ResUploadType . S3 ) {
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
configuration . set ( Constants . FS_S3A_SECRET_KEY , PropertyUtils . getString ( Constants . FS_S3A_SECRET_KEY ) ) ;
fs = FileSystem . get ( configuration ) ;
}
String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
if ( ! StringUtils . isEmpty ( rmHaIds ) ) {
appAddress = getAppAddress ( appAddress , rmHaIds ) ;
logger . info ( "appAddress : {}" , appAddress ) ;
}
configuration . set ( Constants . YARN_APPLICATION_STATUS_ADDRESS , appAddress ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
}
@ -188,15 +191,15 @@ public class HadoopUtils implements Closeable {
/ * *
* cat file on hdfs
*
* @param hdfsFilePath hdfs file path
* @param hdfsFilePath hdfs file path
* @return byte [ ] byte array
* @throws IOException errors
* /
public byte [ ] catFile ( String hdfsFilePath ) throws IOException {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return null ;
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return new byte [ 0 ] ;
}
FSDataInputStream fsDataInputStream = fs . open ( new Path ( hdfsFilePath ) ) ;
@ -204,29 +207,28 @@ public class HadoopUtils implements Closeable {
}
/ * *
* cat file on hdfs
*
* @param hdfsFilePath hdfs file path
* @param skipLineNums skip line numbers
* @param limit read how many lines
* @param hdfsFilePath hdfs file path
* @param skipLineNums skip line numbers
* @param limit read how many lines
* @return content of file
* @throws IOException errors
* /
public List < String > catFile ( String hdfsFilePath , int skipLineNums , int limit ) throws IOException {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return null ;
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return Collections . emptyList ( ) ;
}
try ( FSDataInputStream in = fs . open ( new Path ( hdfsFilePath ) ) ) {
try ( FSDataInputStream in = fs . open ( new Path ( hdfsFilePath ) ) ) {
BufferedReader br = new BufferedReader ( new InputStreamReader ( in ) ) ;
Stream < String > stream = br . lines ( ) . skip ( skipLineNums ) . limit ( limit ) ;
return stream . collect ( Collectors . toList ( ) ) ;
}
}
/ * *
@ -259,17 +261,17 @@ public class HadoopUtils implements Closeable {
/ * *
* the src file is on the local disk . Add it to FS at
* the given dst name .
* @param srcFile local file
* @param dstHdfsPath destination hdfs path
* @param deleteSource whether to delete the src
* @param overwrite whether to overwrite an existing file
*
* @param srcFile local file
* @param dstHdfsPath destination hdfs path
* @param deleteSource whether to delete the src
* @param overwrite whether to overwrite an existing file
* @return if success or not
* @throws IOException errors
* /
public boolean copyLocalToHdfs ( String srcFile , String dstHdfsPath , boolean deleteSource , boolean overwrite ) throws IOException {
Path srcPath = new Path ( srcFile ) ;
Path dstPath = new Path ( dstHdfsPath ) ;
Path dstPath = new Path ( dstHdfsPath ) ;
fs . copyFromLocalFile ( deleteSource , overwrite , srcPath , dstPath ) ;
@ -279,10 +281,10 @@ public class HadoopUtils implements Closeable {
/ * *
* copy hdfs file to local
*
* @param srcHdfsFilePath source hdfs file path
* @param dstFile destination file
* @param deleteSource delete source
* @param overwrite overwrite
* @param srcHdfsFilePath source hdfs file path
* @param dstFile destination file
* @param deleteSource delete source
* @param overwrite overwrite
* @return result of copy hdfs file to local
* @throws IOException errors
* /
@ -293,14 +295,14 @@ public class HadoopUtils implements Closeable {
if ( dstPath . exists ( ) ) {
if ( dstPath . isFile ( ) ) {
if ( overwrite ) {
dstPath . delete ( ) ;
Files . delete ( dstPath . toPath ( ) ) ;
}
} else {
logger . error ( "destination file must be a file" ) ;
}
}
if ( ! dstPath . getParentFile ( ) . exists ( ) ) {
if ( ! dstPath . getParentFile ( ) . exists ( ) ) {
dstPath . getParentFile ( ) . mkdirs ( ) ;
}
@ -308,14 +310,13 @@ public class HadoopUtils implements Closeable {
}
/ * *
*
* delete a file
*
* @param hdfsFilePath the path to delete .
* @param recursive if path is a directory and set to
* true , the directory is deleted else throws an exception . In
* case of a file the recursive can be set to either true or false .
* @return true if delete is successful else false .
* @param recursive if path is a directory and set to
* true , the directory is deleted else throws an exception . In
* case of a file the recursive can be set to either true or false .
* @return true if delete is successful else false .
* @throws IOException errors
* /
public boolean delete ( String hdfsFilePath , boolean recursive ) throws IOException {
@ -340,7 +341,7 @@ public class HadoopUtils implements Closeable {
* @return { @link FileStatus } file status
* @throws Exception errors
* /
public FileStatus [ ] listFileStatus ( String filePath ) throws Exception {
public FileStatus [ ] listFileStatus ( String filePath ) throws Exception {
try {
return fs . listStatus ( new Path ( filePath ) ) ;
} catch ( IOException e ) {
@ -352,10 +353,11 @@ public class HadoopUtils implements Closeable {
/ * *
* Renames Path src to Path dst . Can take place on local fs
* or remote DFS .
*
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful
* @throws IOException on failure
* /
public boolean rename ( String src , String dst ) throws IOException {
return fs . rename ( new Path ( src ) , new Path ( dst ) ) ;
@ -378,7 +380,7 @@ public class HadoopUtils implements Closeable {
String responseContent = HttpUtils . get ( applicationUrl ) ;
JSONObject jsonObject = JSONObject . parseObject ( responseContent ) ;
JSONObject jsonObject = JSON . parseObject ( responseContent ) ;
String result = jsonObject . getJSONObject ( "app" ) . getString ( "finalStatus" ) ;
switch ( result ) {
@ -401,7 +403,6 @@ public class HadoopUtils implements Closeable {
}
/ * *
*
* @return data hdfs path
* /
public static String getHdfsDataBasePath ( ) {
@ -428,11 +429,11 @@ public class HadoopUtils implements Closeable {
* hdfs user dir
*
* @param tenantCode tenant code
* @param userId user id
* @param userId user id
* @return hdfs resource dir
* /
public static String getHdfsUserDir ( String tenantCode , int userId ) {
return String . format ( "%s/home/%d" , getHdfsTenantDir ( tenantCode ) , userId ) ;
public static String getHdfsUserDir ( String tenantCode , int userId ) {
return String . format ( "%s/home/%d" , getHdfsTenantDir ( tenantCode ) , userId ) ;
}
/ * *
@ -480,7 +481,7 @@ public class HadoopUtils implements Closeable {
* getAppAddress
*
* @param appAddress app address
* @param rmHa resource manager ha
* @param rmHa resource manager ha
* @return app address
* /
public static String getAppAddress ( String appAddress , String rmHa ) {
@ -525,8 +526,6 @@ public class HadoopUtils implements Closeable {
* /
private static final class YarnHAAdminUtils extends RMAdminCLI {
private static final Logger logger = LoggerFactory . getLogger ( YarnHAAdminUtils . class ) ;
/ * *
* get active resourcemanager
*
@ -585,8 +584,7 @@ public class HadoopUtils implements Closeable {
JSONObject jsonObject = JSON . parseObject ( retStr ) ;
//get ResourceManager state
String state = jsonObject . getJSONObject ( "clusterInfo" ) . getString ( "haState" ) ;
return state ;
return jsonObject . getJSONObject ( "clusterInfo" ) . getString ( "haState" ) ;
}
}