@ -16,6 +16,9 @@
* /
* /
package org.apache.dolphinscheduler.common.utils ;
package org.apache.dolphinscheduler.common.utils ;
import com.google.common.cache.CacheBuilder ;
import com.google.common.cache.CacheLoader ;
import com.google.common.cache.LoadingCache ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ResUploadType ;
import org.apache.dolphinscheduler.common.enums.ResUploadType ;
@ -37,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collections ;
import java.util.Collections ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.concurrent.TimeUnit ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
import java.util.stream.Stream ;
@ -48,30 +52,37 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory . getLogger ( HadoopUtils . class ) ;
private static final Logger logger = LoggerFactory . getLogger ( HadoopUtils . class ) ;
private static HadoopUtils instance = new HadoopUtils ( ) ;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY" ;
private static final LoadingCache < String , HadoopUtils > cache = CacheBuilder
. newBuilder ( )
. expireAfterWrite ( PropertyUtils . getInt ( Constants . KERBEROS_EXPIRE_TIME , 7 ) , TimeUnit . DAYS )
. build ( new CacheLoader < String , HadoopUtils > ( ) {
@Override
public HadoopUtils load ( String key ) throws Exception {
return new HadoopUtils ( ) ;
}
} ) ;
private static Configuration configuration ;
private static Configuration configuration ;
private static FileSystem fs ;
private static FileSystem fs ;
private String hdfsUser ;
private static String hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
private HadoopUtils ( ) {
private HadoopUtils ( ) {
hdfsUser = PropertyUtils . getString ( Constants . HDFS_ROOT_USER ) ;
init ( ) ;
init ( ) ;
initHdfsPath ( ) ;
initHdfsPath ( ) ;
}
}
public static HadoopUtils getInstance ( ) {
public static HadoopUtils getInstance ( ) {
// if kerberos startup , renew HadoopUtils
if ( CommonUtils . getKerberosStartupState ( ) ) {
return cache . getUnchecked ( HADOOP_UTILS_KEY ) ;
return new HadoopUtils ( ) ;
}
return instance ;
}
}
/ * *
/ * *
* init dolphinscheduler root path in hdfs
* init dolphinscheduler root path in hdfs
* /
* /
private void initHdfsPath ( ) {
private void initHdfsPath ( ) {
String hdfsPath = PropertyUtils . getString ( Constants . DATA_STORE_2_HDFS_BASEPATH ) ;
String hdfsPath = PropertyUtils . getString ( Constants . DATA_STORE_2_HDFS_BASEPATH ) ;
Path path = new Path ( hdfsPath ) ;
Path path = new Path ( hdfsPath ) ;
@ -80,7 +91,7 @@ public class HadoopUtils implements Closeable {
fs . mkdirs ( path ) ;
fs . mkdirs ( path ) ;
}
}
} catch ( Exception e ) {
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
logger . error ( e . getMessage ( ) , e ) ;
}
}
}
}
@ -88,82 +99,75 @@ public class HadoopUtils implements Closeable {
/ * *
/ * *
* init hadoop configuration
* init hadoop configuration
* /
* /
private void init ( ) {
private static void init ( ) {
if ( configuration = = null ) {
try {
synchronized ( HadoopUtils . class ) {
configuration = new Configuration ( ) ;
if ( configuration = = null ) {
try {
String resUploadStartupType = PropertyUtils . getString ( Constants . RES_UPLOAD_STARTUP_TYPE ) ;
configuration = new Configuration ( ) ;
ResUploadType resUploadType = ResUploadType . valueOf ( resUploadStartupType ) ;
String resUploadStartupType = PropertyUtils . getString ( Constants . RES_UPLOAD_STARTUP_TYPE ) ;
if ( resUploadType = = ResUploadType . HDFS ) {
ResUploadType resUploadType = ResUploadType . valueOf ( resUploadStartupType ) ;
if ( PropertyUtils . getBoolean ( Constants . HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE ) ) {
System . setProperty ( Constants . JAVA_SECURITY_KRB5_CONF ,
if ( resUploadType = = ResUploadType . HDFS ) {
PropertyUtils . getString ( Constants . JAVA_SECURITY_KRB5_CONF_PATH ) ) ;
if ( PropertyUtils . getBoolean ( Constants . HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE ) ) {
configuration . set ( Constants . HADOOP_SECURITY_AUTHENTICATION , "kerberos" ) ;
System . setProperty ( Constants . JAVA_SECURITY_KRB5_CONF ,
UserGroupInformation . setConfiguration ( configuration ) ;
PropertyUtils . getString ( Constants . JAVA_SECURITY_KRB5_CONF_PATH ) ) ;
UserGroupInformation . loginUserFromKeytab ( PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_USERNAME ) ,
configuration . set ( Constants . HADOOP_SECURITY_AUTHENTICATION , "kerberos" ) ;
PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_PATH ) ) ;
UserGroupInformation . setConfiguration ( configuration ) ;
}
UserGroupInformation . loginUserFromKeytab ( PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_USERNAME ) ,
PropertyUtils . getString ( Constants . LOGIN_USER_KEY_TAB_PATH ) ) ;
}
String defaultFS = configuration . get ( Constants . FS_DEFAULTFS ) ;
String defaultFS = configuration . get ( Constants . FS_DEFAULTFS ) ;
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
//first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system
// the default is the local file system
if ( defaultFS . startsWith ( "file" ) ) {
if ( defaultFS . startsWith ( "file" ) ) {
String defaultFSProp = PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ;
String defaultFSProp = PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ;
if ( StringUtils . isNotBlank ( defaultFSProp ) ) {
if ( StringUtils . isNotBlank ( defaultFSProp ) ) {
Map < String , String > fsRelatedProps = PropertyUtils . getPrefixedProperties ( "fs." ) ;
Map < String , String > fsRelatedProps = PropertyUtils . getPrefixedProperties ( "fs." ) ;
configuration . set ( Constants . FS_DEFAULTFS , defaultFSProp ) ;
configuration . set ( Constants . FS_DEFAULTFS , defaultFSProp ) ;
fsRelatedProps . forEach ( ( key , value ) - > configuration . set ( key , value ) ) ;
fsRelatedProps . forEach ( ( key , value ) - > configuration . set ( key , value ) ) ;
} else {
} else {
logger . error ( "property:{} can not to be empty, please set!" , Constants . FS_DEFAULTFS ) ;
logger . error ( "property:{} can not to be empty, please set!" , Constants . FS_DEFAULTFS ) ;
throw new RuntimeException (
throw new RuntimeException (
String . format ( "property: %s can not to be empty, please set!" , Constants . FS_DEFAULTFS )
String . format ( "property: %s can not to be empty, please set!" , Constants . FS_DEFAULTFS )
) ;
) ;
}
}
} else {
} else {
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
}
}
if ( fs = = null ) {
if ( fs = = null ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
@Override
@Override
public Boolean run ( ) throws Exception {
public Boolean run ( ) throws Exception {
fs = FileSystem . get ( configuration ) ;
fs = FileSystem . get ( configuration ) ;
return true ;
return true ;
}
} ) ;
} else {
logger . warn ( "hdfs.root.user is not set value!" ) ;
fs = FileSystem . get ( configuration ) ;
}
}
}
} else if ( resUploadType = = ResUploadType . S3 ) {
} ) ;
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
} else {
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
logger . warn ( "hdfs.root.user is not set value!" ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
fs = FileSystem . get ( configuration ) ;
configuration . set ( Constants . FS_S3A_SECRET_KEY , PropertyUtils . getString ( Constants . FS_S3A_SECRET_KEY ) ) ;
fs = FileSystem . get ( configuration ) ;
}
String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
if ( ! StringUtils . isEmpty ( rmHaIds ) ) {
appAddress = getAppAddress ( appAddress , rmHaIds ) ;
logger . info ( "appAddress : {}" , appAddress ) ;
}
configuration . set ( Constants . YARN_APPLICATION_STATUS_ADDRESS , appAddress ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
}
}
}
} else if ( resUploadType = = ResUploadType . S3 ) {
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
configuration . set ( Constants . FS_S3A_SECRET_KEY , PropertyUtils . getString ( Constants . FS_S3A_SECRET_KEY ) ) ;
fs = FileSystem . get ( configuration ) ;
}
String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
if ( ! StringUtils . isEmpty ( rmHaIds ) ) {
appAddress = getAppAddress ( appAddress , rmHaIds ) ;
logger . info ( "appAddress : {}" , appAddress ) ;
}
}
configuration . set ( Constants . YARN_APPLICATION_STATUS_ADDRESS , appAddress ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
}
}
}
@ -187,14 +191,14 @@ public class HadoopUtils implements Closeable {
/ * *
/ * *
* cat file on hdfs
* cat file on hdfs
*
*
* @param hdfsFilePath hdfs file path
* @param hdfsFilePath hdfs file path
* @return byte [ ] byte array
* @return byte [ ] byte array
* @throws IOException errors
* @throws IOException errors
* /
* /
public byte [ ] catFile ( String hdfsFilePath ) throws IOException {
public byte [ ] catFile ( String hdfsFilePath ) throws IOException {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return new byte [ 0 ] ;
return new byte [ 0 ] ;
}
}
@ -203,29 +207,28 @@ public class HadoopUtils implements Closeable {
}
}
/ * *
/ * *
* cat file on hdfs
* cat file on hdfs
*
*
* @param hdfsFilePath hdfs file path
* @param hdfsFilePath hdfs file path
* @param skipLineNums skip line numbers
* @param skipLineNums skip line numbers
* @param limit read how many lines
* @param limit read how many lines
* @return content of file
* @return content of file
* @throws IOException errors
* @throws IOException errors
* /
* /
public List < String > catFile ( String hdfsFilePath , int skipLineNums , int limit ) throws IOException {
public List < String > catFile ( String hdfsFilePath , int skipLineNums , int limit ) throws IOException {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
if ( StringUtils . isBlank ( hdfsFilePath ) ) {
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
logger . error ( "hdfs file path:{} is blank" , hdfsFilePath ) ;
return Collections . emptyList ( ) ;
return Collections . emptyList ( ) ;
}
}
try ( FSDataInputStream in = fs . open ( new Path ( hdfsFilePath ) ) ) {
try ( FSDataInputStream in = fs . open ( new Path ( hdfsFilePath ) ) ) {
BufferedReader br = new BufferedReader ( new InputStreamReader ( in ) ) ;
BufferedReader br = new BufferedReader ( new InputStreamReader ( in ) ) ;
Stream < String > stream = br . lines ( ) . skip ( skipLineNums ) . limit ( limit ) ;
Stream < String > stream = br . lines ( ) . skip ( skipLineNums ) . limit ( limit ) ;
return stream . collect ( Collectors . toList ( ) ) ;
return stream . collect ( Collectors . toList ( ) ) ;
}
}
}
}
/ * *
/ * *
@ -258,17 +261,17 @@ public class HadoopUtils implements Closeable {
/ * *
/ * *
* the src file is on the local disk . Add it to FS at
* the src file is on the local disk . Add it to FS at
* the given dst name .
* the given dst name .
*
* @param srcFile local file
* @param srcFile local file
* @param dstHdfsPath destination hdfs path
* @param dstHdfsPath destination hdfs path
* @param deleteSource whether to delete the src
* @param deleteSource whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param overwrite whether to overwrite an existing file
* @return if success or not
* @return if success or not
* @throws IOException errors
* @throws IOException errors
* /
* /
public boolean copyLocalToHdfs ( String srcFile , String dstHdfsPath , boolean deleteSource , boolean overwrite ) throws IOException {
public boolean copyLocalToHdfs ( String srcFile , String dstHdfsPath , boolean deleteSource , boolean overwrite ) throws IOException {
Path srcPath = new Path ( srcFile ) ;
Path srcPath = new Path ( srcFile ) ;
Path dstPath = new Path ( dstHdfsPath ) ;
Path dstPath = new Path ( dstHdfsPath ) ;
fs . copyFromLocalFile ( deleteSource , overwrite , srcPath , dstPath ) ;
fs . copyFromLocalFile ( deleteSource , overwrite , srcPath , dstPath ) ;
@ -278,10 +281,10 @@ public class HadoopUtils implements Closeable {
/ * *
/ * *
* copy hdfs file to local
* copy hdfs file to local
*
*
* @param srcHdfsFilePath source hdfs file path
* @param srcHdfsFilePath source hdfs file path
* @param dstFile destination file
* @param dstFile destination file
* @param deleteSource delete source
* @param deleteSource delete source
* @param overwrite overwrite
* @param overwrite overwrite
* @return result of copy hdfs file to local
* @return result of copy hdfs file to local
* @throws IOException errors
* @throws IOException errors
* /
* /
@ -299,7 +302,7 @@ public class HadoopUtils implements Closeable {
}
}
}
}
if ( ! dstPath . getParentFile ( ) . exists ( ) ) {
if ( ! dstPath . getParentFile ( ) . exists ( ) ) {
dstPath . getParentFile ( ) . mkdirs ( ) ;
dstPath . getParentFile ( ) . mkdirs ( ) ;
}
}
@ -307,14 +310,13 @@ public class HadoopUtils implements Closeable {
}
}
/ * *
/ * *
*
* delete a file
* delete a file
*
*
* @param hdfsFilePath the path to delete .
* @param hdfsFilePath the path to delete .
* @param recursive if path is a directory and set to
* @param recursive if path is a directory and set to
* true , the directory is deleted else throws an exception . In
* true , the directory is deleted else throws an exception . In
* case of a file the recursive can be set to either true or false .
* case of a file the recursive can be set to either true or false .
* @return true if delete is successful else false .
* @return true if delete is successful else false .
* @throws IOException errors
* @throws IOException errors
* /
* /
public boolean delete ( String hdfsFilePath , boolean recursive ) throws IOException {
public boolean delete ( String hdfsFilePath , boolean recursive ) throws IOException {
@ -339,7 +341,7 @@ public class HadoopUtils implements Closeable {
* @return { @link FileStatus } file status
* @return { @link FileStatus } file status
* @throws Exception errors
* @throws Exception errors
* /
* /
public FileStatus [ ] listFileStatus ( String filePath ) throws Exception {
public FileStatus [ ] listFileStatus ( String filePath ) throws Exception {
try {
try {
return fs . listStatus ( new Path ( filePath ) ) ;
return fs . listStatus ( new Path ( filePath ) ) ;
} catch ( IOException e ) {
} catch ( IOException e ) {
@ -351,10 +353,11 @@ public class HadoopUtils implements Closeable {
/ * *
/ * *
* Renames Path src to Path dst . Can take place on local fs
* Renames Path src to Path dst . Can take place on local fs
* or remote DFS .
* or remote DFS .
*
* @param src path to be renamed
* @param src path to be renamed
* @param dst new path after rename
* @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful
* @return true if rename is successful
* @throws IOException on failure
* /
* /
public boolean rename ( String src , String dst ) throws IOException {
public boolean rename ( String src , String dst ) throws IOException {
return fs . rename ( new Path ( src ) , new Path ( dst ) ) ;
return fs . rename ( new Path ( src ) , new Path ( dst ) ) ;
@ -400,7 +403,6 @@ public class HadoopUtils implements Closeable {
}
}
/ * *
/ * *
*
* @return data hdfs path
* @return data hdfs path
* /
* /
public static String getHdfsDataBasePath ( ) {
public static String getHdfsDataBasePath ( ) {
@ -427,11 +429,11 @@ public class HadoopUtils implements Closeable {
* hdfs user dir
* hdfs user dir
*
*
* @param tenantCode tenant code
* @param tenantCode tenant code
* @param userId user id
* @param userId user id
* @return hdfs resource dir
* @return hdfs resource dir
* /
* /
public static String getHdfsUserDir ( String tenantCode , int userId ) {
public static String getHdfsUserDir ( String tenantCode , int userId ) {
return String . format ( "%s/home/%d" , getHdfsTenantDir ( tenantCode ) , userId ) ;
return String . format ( "%s/home/%d" , getHdfsTenantDir ( tenantCode ) , userId ) ;
}
}
/ * *
/ * *
@ -479,7 +481,7 @@ public class HadoopUtils implements Closeable {
* getAppAddress
* getAppAddress
*
*
* @param appAddress app address
* @param appAddress app address
* @param rmHa resource manager ha
* @param rmHa resource manager ha
* @return app address
* @return app address
* /
* /
public static String getAppAddress ( String appAddress , String rmHa ) {
public static String getAppAddress ( String appAddress , String rmHa ) {