@ -37,7 +37,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
private static final String DELIMITER = "/" ;
private final AmazonS3 s3 ;
private final AmazonS3 readClient ;
private final AmazonS3 writeClient ;
private final String bucket ;
@ -52,7 +53,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
clientConfiguration . setUseTcpKeepAlive ( config . getKeepAlive ( ) ) ;
clientConfiguration . setMaxConnections ( config . getMaxConnection ( ) ) ;
amazonS3ClientBuilder = amazonS3ClientBuilder . withClientConfiguration ( clientConfiguration ) ;
this . s3 = amazonS3ClientBuilder . build ( ) ;
this . readClient = amazonS3ClientBuilder . build ( ) ;
this . writeClient = amazonS3ClientBuilder . build ( ) ;
this . bucket = config . getBucket ( ) ;
}
@ -64,7 +66,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override
public FineFileEntry getEntry ( String path ) {
try {
ObjectMetadata metadata = s3 . getObjectMetadata ( bucket , path ) ;
ObjectMetadata metadata = readClient . getObjectMetadata ( bucket , path ) ;
return s3Object2FileEntry ( metadata , path ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "{} not exist!" , path ) ;
@ -83,7 +85,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
String nextMarker = null ;
ObjectListing objectListing ;
do {
objectListing = this . s3 . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
objectListing = this . readClient . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
for ( S3ObjectSummary summary : objectListing . getObjectSummaries ( ) ) {
String key = summary . getKey ( ) ;
if ( ! key . endsWith ( DELIMITER ) ) {
@ -110,7 +112,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
public InputStream read ( String filePath ) throws ResourceIOException {
GetObjectRequest request = new GetObjectRequest ( bucket , filePath ) ;
try {
return s3 . getObject ( request ) . getObjectContent ( ) ;
return readClient . getObject ( request ) . getObjectContent ( ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "[S3] read path {} is error!" , filePath ) ;
return new ByteArrayInputStream ( new byte [ 0 ] ) ;
@ -121,7 +123,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
public void write ( String path , byte [ ] data ) {
ObjectMetadata metadata ;
try {
metadata = this . s3 . getObjectMetadata ( this . bucket , path ) ;
metadata = this . readClient . getObjectMetadata ( this . bucket , path ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . warn ( "[S3] getObjectMetaData error ,path {}" , path ) ;
metadata = new ObjectMetadata ( ) ;
@ -133,18 +135,18 @@ public class S3ResourceRepository extends BaseResourceRepository {
if ( metadata ! = null ) {
metadata . setContentLength ( data . length ) ;
}
s3 . putObject ( bucket , path , new ByteArrayInputStream ( data ) , metadata ) ;
writeClient . putObject ( bucket , path , new ByteArrayInputStream ( data ) , metadata ) ;
}
@Override
public void write ( String path , InputStream inputStream ) throws ResourceIOException {
if ( null = = inputStream ) {
this . s3 . putObject ( this . bucket , path , "" ) ;
this . writeClient . putObject ( this . bucket , path , "" ) ;
} else {
ObjectMetadata metadata = new ObjectMetadata ( ) ;
try {
metadata . setContentLength ( inputStream . available ( ) ) ;
this . s3 . putObject ( this . bucket , path , inputStream , metadata ) ;
this . writeClient . putObject ( this . bucket , path , inputStream , metadata ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . info ( "S3 inputStream.putObject error,path is {}, error message {}" , path , e . getMessage ( ) ) ;
} finally {
@ -165,7 +167,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
String parent = paths . get ( i ) ;
PutObjectRequest req = new PutObjectRequest ( this . bucket , parent , new ByteArrayInputStream ( new byte [ 0 ] ) , buildEmptyMetadata ( ) ) ;
try {
this . s3 . putObject ( req ) ;
this . writeClient . putObject ( req ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "[S3] Failed to create parent path {}" , parent ) ;
return false ;
@ -184,7 +186,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
String parent = paths . get ( i ) ;
PutObjectRequest req = new PutObjectRequest ( bucket , parent , new ByteArrayInputStream ( new byte [ 0 ] ) , buildEmptyMetadata ( ) ) ;
try {
s3 . putObject ( req ) ;
writeClient . putObject ( req ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "[S3] Failed to create parent path {}" , parent ) ;
return false ;
@ -195,7 +197,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override
public boolean delete ( String path ) {
this . s3 . deleteObject ( this . bucket , path ) ;
this . writeClient . deleteObject ( this . bucket , path ) ;
String prefix = path ;
if ( ! path . endsWith ( DELIMITER ) ) {
prefix = path + DELIMITER ;
@ -206,11 +208,11 @@ public class S3ResourceRepository extends BaseResourceRepository {
ObjectListing objectListing ;
String nextMarker = null ;
do {
objectListing = this . s3 . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
objectListing = this . readClient . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
for ( S3ObjectSummary summary : objectListing . getObjectSummaries ( ) ) {
String key = summary . getKey ( ) ;
if ( ! key . endsWith ( DELIMITER ) ) {
this . s3 . deleteObject ( this . bucket , key ) ;
this . writeClient . deleteObject ( this . bucket , key ) ;
}
}
for ( String pre : objectListing . getCommonPrefixes ( ) ) {
@ -221,14 +223,14 @@ public class S3ResourceRepository extends BaseResourceRepository {
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "[S3] delete error path {}" , path ) ;
}
this . s3 . deleteObject ( this . bucket , prefix ) ;
this . writeClient . deleteObject ( this . bucket , prefix ) ;
return true ;
}
@Override
public boolean exist ( String path ) {
try {
return this . s3 . doesObjectExist ( this . bucket , path ) | | this . s3 . doesObjectExist ( this . bucket , path + DELIMITER ) ;
return this . readClient . doesObjectExist ( this . bucket , path ) | | this . readClient . doesObjectExist ( this . bucket , path + DELIMITER ) ;
} catch ( Exception e ) {
FineLoggerFactory . getLogger ( ) . error ( e , "[S3] exist error path {}" , path ) ;
return false ;
@ -246,7 +248,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
String nextMarker = null ;
ObjectListing objectListing ;
do {
objectListing = this . s3 . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
objectListing = this . readClient . listObjects ( listObjectsRequest . withMarker ( nextMarker ) ) ;
for ( S3ObjectSummary objectSummary : objectListing . getObjectSummaries ( ) ) {
String key = objectSummary . getKey ( ) ;
if ( StringKit . equals ( key , dir ) ) {
@ -286,7 +288,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override
public long lastModified ( String path ) {
try {
S3Object s3Object = this . s3 . getObject ( this . bucket , path ) ;
S3Object s3Object = this . readClient . getObject ( this . bucket , path ) ;
if ( s3Object ! = null ) {
try {
return s3Object . getObjectMetadata ( ) . getLastModified ( ) . getTime ( ) ;
@ -303,7 +305,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override
public long length ( String path ) {
try {
ObjectMetadata metadata = this . s3 . getObjectMetadata ( this . bucket , path ) ;
ObjectMetadata metadata = this . readClient . getObjectMetadata ( this . bucket , path ) ;
return metadata . getContentLength ( ) ;
} catch ( Exception e ) {
LogKit . info ( "[S3] {} not exist!" , path ) ;
@ -327,13 +329,14 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override
public boolean copyFile ( String origPath , String desPath ) throws ResourceIOException {
s3 . copyObject ( bucket , origPath , bucket , desPath ) ;
writeClient . copyObject ( bucket , origPath , bucket , desPath ) ;
return exist ( desPath ) ;
}
@Override
public void shutDown ( ) {
s3 . shutdown ( ) ;
readClient . shutdown ( ) ;
writeClient . shutdown ( ) ;
}
@Override