Browse Source

Pull request #401: REPORT-102146 && JSY-28624 fix: isDirectory方法某些场景判断不正确;优化下分片上传逻辑

Merge in PG/plugin-repository-s3 from ~AFLY/plugin-repository-s3:release/11.0 to release/11.0

* commit '8051a92c88db7c113c585ddb55436d9b0f5d1098':
  JSY-28624 fix: 优化下分片上传逻辑
  REPORT-102146 fix: isDirectory方法某些场景判断不正确
release/11.0
Afly-储泓飞 2 years ago
parent
commit
cfc791698d
  1. 138
      src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java

138
src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java

@ -27,7 +27,6 @@ import com.fanruan.api.util.StringKit;
import com.fr.io.repository.FineFileEntry; import com.fr.io.repository.FineFileEntry;
import com.fr.io.repository.base.BaseResourceRepository; import com.fr.io.repository.base.BaseResourceRepository;
import com.fr.io.utils.ResourceIOUtils; import com.fr.io.utils.ResourceIOUtils;
import com.fr.log.FineLoggerFactory;
import com.fr.stable.Filter; import com.fr.stable.Filter;
import com.fr.stable.StringUtils; import com.fr.stable.StringUtils;
import com.fr.third.org.apache.commons.io.output.NullOutputStream; import com.fr.third.org.apache.commons.io.output.NullOutputStream;
@ -52,7 +51,7 @@ public class S3ResourceRepository extends BaseResourceRepository {
private static final int PART_SIZE = 5 * 1024 * 1024; private static final int PART_SIZE = 5 * 1024 * 1024;
private static final int MULTIPART_UPLOAD_LIMIT = 4 * PART_SIZE; private static final int MULTIPART_UPLOAD_LIMIT = 20 * PART_SIZE;
private static final String DELIMITER = "/"; private static final String DELIMITER = "/";
@ -155,75 +154,69 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override @Override
public void write(String path, byte[] data) { public void write(String path, byte[] data) {
ObjectMetadata metadata; int length = data.length;
try { if (length > MULTIPART_UPLOAD_LIMIT) {
metadata = s3.getObjectMetadata(bucket, path); multipartUpload(path, new ByteArrayInputStream(data));
} catch (Exception e) { } else {
metadata = new ObjectMetadata(); ObjectMetadata metadata;
String mimeType = URLConnection.guessContentTypeFromName(path); try {
if (mimeType != null) { metadata = s3.getObjectMetadata(bucket, path);
metadata.setContentType(mimeType); } catch (Exception e) {
metadata = new ObjectMetadata();
String mimeType = URLConnection.guessContentTypeFromName(path);
if (mimeType != null) {
metadata.setContentType(mimeType);
}
} }
if (metadata != null) {
metadata.setContentLength(length);
}
s3.putObject(bucket, path, new ByteArrayInputStream(data), metadata);
} }
if (metadata != null) {
metadata.setContentLength(data.length);
}
s3.putObject(bucket, path, new ByteArrayInputStream(data), metadata);
} }
@Override @Override
public void write(String path, InputStream inputStream) throws ResourceIOException { public void write(String path, InputStream inputStream) throws ResourceIOException {
long dataLength = 0; multipartUpload(path, inputStream);
}
private void multipartUpload(String path, InputStream inputStream) {
try { try {
dataLength = inputStream.available(); // Step 1: 初始化分片上传
} catch (IOException e) { InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, path);
FineLoggerFactory.getLogger().error(e.getMessage(), e); InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
} String uploadId = initResponse.getUploadId();
//超过一定大小才使用分片上传,小文件来说,网络传输时间可能较短,且上传失败的风险相对较低。
//在网络稳定的情况下,使用分片上传可能没有太多的优势,反而增加了额外开销和复杂性 // Step 2: 分片上传文件
if (dataLength > MULTIPART_UPLOAD_LIMIT) { List<PartETag> partETags = new ArrayList<>();
try { byte[] buffer = new byte[PART_SIZE];
// Step 1: 初始化分片上传 int bytesRead;
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, path); int partNumber = 1;
InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId(); while ((bytesRead = inputStream.read(buffer)) > 0) {
// 创建上传请求
// Step 2: 分片上传文件 UploadPartRequest uploadRequest = new UploadPartRequest()
List<PartETag> partETags = new ArrayList<>(); .withBucketName(bucket)
long position = 0; .withKey(path)
.withUploadId(uploadId)
for (int partNumber = 1; position < dataLength; partNumber++) { .withPartNumber(partNumber)
// 最后一个分片可能小于5MB .withInputStream(new ByteArrayInputStream(buffer, 0, bytesRead))
long partSizeBytes = Math.min(PART_SIZE, dataLength - position); .withPartSize(bytesRead);
byte[] bytes = new byte[(int) partSizeBytes];
inputStream.read(bytes);
// 创建上传请求
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(path)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(partSizeBytes);
// 上传分片
UploadPartResult uploadResult = s3.uploadPart(uploadRequest);
partETags.add(uploadResult.getPartETag());
position += partSizeBytes;
}
// Step 3: 完成分片上传 // 上传分片
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucket, path, uploadId, partETags); UploadPartResult uploadResult = s3.uploadPart(uploadRequest);
s3.completeMultipartUpload(compRequest); partETags.add(uploadResult.getPartETag());
} catch (IOException e) {
throw new ResourceIOException(e); partNumber++;
} finally {
ResourceIOUtils.close(inputStream);
} }
} else {
super.write(path, inputStream); // Step 3: 完成分片上传
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucket, path, uploadId, partETags);
s3.completeMultipartUpload(compRequest);
} catch (IOException e) {
throw new ResourceIOException(e);
} finally {
ResourceIOUtils.close(inputStream);
} }
} }
@ -399,19 +392,16 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override @Override
public boolean isDirectory(String path) { public boolean isDirectory(String path) {
if (path.endsWith(DELIMITER)) {
if (path.endsWith(DELIMITER) && exist(path)) { return exist(path);
return true;
}
ObjectListing listing = s3.listObjects(bucket, path);
if (listing.getObjectSummaries().isEmpty()) {
return false;
}
if (listing.getObjectSummaries().size() > 1) {
return true;
} else { } else {
S3ObjectSummary summary = listing.getObjectSummaries().get(0); ObjectListing listing = s3.listObjects(bucket, path);
return !StringKit.equals(listing.getPrefix(), summary.getKey()); List<S3ObjectSummary> objectSummaries = listing.getObjectSummaries();
if (objectSummaries.isEmpty()) {
return false;
}
String dirFormat = path + DELIMITER;
return objectSummaries.stream().anyMatch(s3ObjectSummary -> StringUtils.equals(s3ObjectSummary.getKey(), dirFormat));
} }
} }

Loading…
Cancel
Save