@ -70,7 +70,6 @@ public class HadoopUtils implements Closeable {
public static final String resourceUploadPath = PropertyUtils . getString ( RESOURCE_UPLOAD_PATH , "/dolphinscheduler" ) ;
public static final String resourceUploadPath = PropertyUtils . getString ( RESOURCE_UPLOAD_PATH , "/dolphinscheduler" ) ;
public static final String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
public static final String rmHaIds = PropertyUtils . getString ( Constants . YARN_RESOURCEMANAGER_HA_RM_IDS ) ;
public static final String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
public static final String appAddress = PropertyUtils . getString ( Constants . YARN_APPLICATION_STATUS_ADDRESS ) ;
public static final String jobHistoryAddress = PropertyUtils . getString ( Constants . YARN_JOB_HISTORY_STATUS_ADDRESS ) ;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY" ;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY" ;
@ -123,7 +122,7 @@ public class HadoopUtils implements Closeable {
try {
try {
configuration = new HdfsConfiguration ( ) ;
configuration = new HdfsConfiguration ( ) ;
String resourceStorageType = PropertyUtils . getUpperCase String ( Constants . RESOURCE_STORAGE_TYPE ) ;
String resourceStorageType = PropertyUtils . getString ( Constants . RESOURCE_STORAGE_TYPE ) ;
ResUploadType resUploadType = ResUploadType . valueOf ( resourceStorageType ) ;
ResUploadType resUploadType = ResUploadType . valueOf ( resourceStorageType ) ;
if ( resUploadType = = ResUploadType . HDFS ) {
if ( resUploadType = = ResUploadType . HDFS ) {
@ -156,18 +155,22 @@ public class HadoopUtils implements Closeable {
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
logger . info ( "get property:{} -> {}, from core-site.xml hdfs-site.xml " , Constants . FS_DEFAULTFS , defaultFS ) ;
}
}
if ( fs = = null ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
if ( StringUtils . isNotEmpty ( hdfsUser ) ) {
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
UserGroupInformation ugi = UserGroupInformation . createRemoteUser ( hdfsUser ) ;
ugi . doAs ( ( PrivilegedExceptionAction < Boolean > ) ( ) - > {
ugi . doAs ( new PrivilegedExceptionAction < Boolean > ( ) {
@Override
public Boolean run ( ) throws Exception {
fs = FileSystem . get ( configuration ) ;
fs = FileSystem . get ( configuration ) ;
return true ;
return true ;
}
} ) ;
} ) ;
} else {
} else {
logger . warn ( "hdfs.root.user is not set value!" ) ;
logger . warn ( "hdfs.root.user is not set value!" ) ;
fs = FileSystem . get ( configuration ) ;
fs = FileSystem . get ( configuration ) ;
}
}
}
} else if ( resUploadType = = ResUploadType . S3 ) {
} else if ( resUploadType = = ResUploadType . S3 ) {
System . setProperty ( Constants . AWS_S3_V4 , Constants . STRING_TRUE ) ;
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
configuration . set ( Constants . FS_DEFAULTFS , PropertyUtils . getString ( Constants . FS_DEFAULTFS ) ) ;
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
configuration . set ( Constants . FS_S3A_ENDPOINT , PropertyUtils . getString ( Constants . FS_S3A_ENDPOINT ) ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
configuration . set ( Constants . FS_S3A_ACCESS_KEY , PropertyUtils . getString ( Constants . FS_S3A_ACCESS_KEY ) ) ;
@ -201,23 +204,23 @@ public class HadoopUtils implements Closeable {
* if rmHaIds is empty , single resourcemanager enabled
* if rmHaIds is empty , single resourcemanager enabled
* if rmHaIds not empty : resourcemanager HA enabled
* if rmHaIds not empty : resourcemanager HA enabled
* /
* /
String appUrl = "" ;
if ( StringUtils . isEmpty ( rmHaIds ) ) {
//single resourcemanager enabled
appUrl = appAddress ;
yarnEnabled = true ;
yarnEnabled = true ;
String appUrl = StringUtils . isEmpty ( rmHaIds ) ? appAddress : getAppAddress ( appAddress , rmHaIds ) ;
} else {
if ( StringUtils . isBlank ( appUrl ) ) {
//resourcemanager HA enabled
throw new Exception ( "yarn application url generation failed" ) ;
appUrl = getAppAddress ( appAddress , rmHaIds ) ;
}
yarnEnabled = true ;
if ( logger . isDebugEnabled ( ) ) {
logger . info ( "application url : {}" , appUrl ) ;
logger . debug ( "yarn application url:{}, applicationId:{}" , appUrl , applicationId ) ;
}
String activeResourceManagerPort = String . valueOf ( PropertyUtils . getInt ( Constants . HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT , 8088 ) ) ;
return String . format ( appUrl , activeResourceManagerPort , applicationId ) ;
}
}
public String getJobHistoryUrl ( String applicationId ) {
if ( StringUtils . isBlank ( appUrl ) ) {
//eg:application_1587475402360_712719 -> job_1587475402360_712719
throw new Exception ( "application url is blank" ) ;
String jobId = applicationId . replace ( "application" , "job" ) ;
}
return String . format ( jobHistoryAddress , job Id) ;
return String . format ( appUrl , application Id) ;
}
}
/ * *
/ * *