Browse Source

Pull request #376: 11.0.19版本插件代码合并

Merge in PG/plugin-repository-s3 from release/11.0 to persist/11.0

* commit '3cf593f1c96a1ca7714f67329b7f09e6b3439dbd':
  REPORT-100817 fix: s3插件对于分片上传的支持
  REPORT-100817 fix: s3插件对于分片上传的支持
  REPORT-99625 fix:enablePathStyleAccess默认false
  无jira任务 chore: 提升版本
  无jira任务 fix: 测试连接后释放资源;list接口过滤自身目录
  无jira任务 变量判断
  REPORT-97765 feat: s3插件支持配置连接池大小
  REPORT-97765 feat: s3插件支持配置连接池大小
  enablePathStyleAccess验证问题
  console
  REPORT-97243 fix:值显示undefined
persist/11.0
Icey.Zhang-张洁 2 years ago
parent
commit
923dd4261a
  1. 4
      front/bundle.js
  2. 4
      plugin.xml
  3. 15
      src/main/java/com/fanruan/fs/s3/repository/core/S3Config.java
  4. 7
      src/main/java/com/fanruan/fs/s3/repository/core/S3RepositoryFactory.java
  5. 90
      src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java
  6. 2
      src/main/resources/com/fanruan/fs/s3/repository/web/js/bundle.js

4
front/bundle.js

@ -28,7 +28,6 @@ BI.config("dec.constant.intelligence.cluster.file.server", function (items) {
render: function () {
var self = this, o = this.options;
console.log(this.model.enablePathStyleAccess);
return {
type: "bi.vertical",
@ -252,7 +251,6 @@ BI.config("dec.constant.intelligence.cluster.file.server", function (items) {
state: function () {
var val = this.options.value;
console.log(this.options.value);
return {
endPoint: val.endPoint,
@ -262,7 +260,7 @@ BI.config("dec.constant.intelligence.cluster.file.server", function (items) {
bucket: val.bucket,
workRoot: val.workRoot,
isOpen: false,
enablePathStyleAccess: val.enablePathStyleAccess,
enablePathStyleAccess: String(Boolean(val.enablePathStyleAccess)),
signerOverride: val.signerOverride,
};
},

4
plugin.xml

@ -5,12 +5,14 @@
<main-package>com.fanruan.fs</main-package>
<active>yes</active>
<hidden>no</hidden>
<version>1.3.8</version>
<version>1.3.9</version>
<env-version>11.0~11.0</env-version>
<jartime>2023-03-14</jartime>
<vendor>richie</vendor>
<description><![CDATA[使用支持S3协议的云存储文件系统作为文件服务器。]]></description>
<change-notes><![CDATA[
[2023-07-24]支持大文件分片上传。 <br/>
[2023-06-30]修复默认配置获取错误的问题,过滤有问题的路径。 <br/>
[2023-03-28]第三方组件升级。 <br/>
[2023-01-03]优化写文件性能; 修复文件太多显示不全的问题。<br/>
[2022-09-22]第三方组件升级。 <br/>

15
src/main/java/com/fanruan/fs/s3/repository/core/S3Config.java

@ -38,6 +38,9 @@ public class S3Config extends CommonRepoConfig {
@Identifier("signerOverride")
private Conf<String> signerOverride = HolderKit.simple(StringUtils.EMPTY);
@Identifier("maxConnections")
private Conf<Integer> maxConnections = HolderKit.simple(200);
@GetConfig("endPoint")
public String getEndPoint() {
return endPoint.get();
@ -98,6 +101,16 @@ public class S3Config extends CommonRepoConfig {
this.signerOverride.set(signerOverride);
}
@GetConfig("maxConnections")
public int getMaxConnections() {
return maxConnections.get();
}
@SetConfig("maxConnections")
public void setMaxConnections(int maxConnections) {
this.maxConnections.set(maxConnections);
}
@Override
public void update(String key) {
super.update(key);
@ -109,6 +122,7 @@ public class S3Config extends CommonRepoConfig {
this.setBucket(newConfig.getBucket());
this.setEnablePathStyleAccess(newConfig.isEnablePathStyleAccess());
this.setSignerOverride(newConfig.getSignerOverride());
this.setMaxConnections(newConfig.getMaxConnections());
}
}
@ -121,6 +135,7 @@ public class S3Config extends CommonRepoConfig {
cloned.bucket = (Conf<String>) bucket.clone();
cloned.enablePathStyleAccess = (Conf<Boolean>) enablePathStyleAccess.clone();
cloned.signerOverride = (Conf<String>) signerOverride.clone();
cloned.maxConnections = (Conf<Integer>) maxConnections.clone();
return cloned;
}
}

7
src/main/java/com/fanruan/fs/s3/repository/core/S3RepositoryFactory.java

@ -40,6 +40,7 @@ public class S3RepositoryFactory extends ConfigRepositoryFactory<S3Config> {
@Override
public boolean verifyConfig(S3Config config) {
AmazonS3 s3 = null;
try {
BasicAWSCredentials credentials = new BasicAWSCredentials(config.getAccessKeyId(), config.getPassword());
AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard()
@ -57,11 +58,15 @@ public class S3RepositoryFactory extends ConfigRepositoryFactory<S3Config> {
clientConfiguration.setProtocol(Protocol.HTTP);
}
amazonS3ClientBuilder = amazonS3ClientBuilder.withClientConfiguration(clientConfiguration);
AmazonS3 s3 = amazonS3ClientBuilder.build();
s3 = amazonS3ClientBuilder.build();
s3.listObjects(config.getBucket());
} catch (Exception e) {
LogKit.error(e.getMessage(), e);
return false;
} finally {
if (s3 != null) {
s3.shutdown();
}
}
return true;
}

90
src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java

@ -7,23 +7,34 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.util.IOUtils;
import com.fanruan.api.log.LogKit;
import com.fanruan.api.util.StringKit;
import com.fr.io.repository.FineFileEntry;
import com.fr.io.repository.base.BaseResourceRepository;
import com.fr.io.utils.ResourceIOUtils;
import com.fr.log.FineLoggerFactory;
import com.fr.stable.Filter;
import com.fr.stable.StringUtils;
import com.fr.third.org.apache.commons.io.output.NullOutputStream;
import com.fr.workspace.resource.ResourceIOException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
@ -39,6 +50,10 @@ public class S3ResourceRepository extends BaseResourceRepository {
private static final int PAGE_SIZE = 1000;
private static final int PART_SIZE = 5 * 1024 * 1024;
private static final int MULTIPART_UPLOAD_LIMIT = 4 * PART_SIZE;
private static final String DELIMITER = "/";
public static final String HTTP = "http:";
@ -57,6 +72,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
amazonS3ClientBuilder = amazonS3ClientBuilder.enablePathStyleAccess();
}
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(config.getMaxConnections());
LogKit.info("Max connections is {}!", clientConfiguration.getMaxConnections());
if (StringUtils.isNotEmpty(config.getSignerOverride())) {
clientConfiguration.setSignerOverride(config.getSignerOverride());
}
@ -95,15 +112,15 @@ public class S3ResourceRepository extends BaseResourceRepository {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket)
.withPrefix(dir).withDelimiter(DELIMITER);
ObjectListing objectListing = s3.listObjects(listObjectsRequest);
collectFileEntry(result, objectListing);
collectFileEntry(dir, result, objectListing);
while (objectListing.isTruncated()) {
objectListing = s3.listNextBatchOfObjects(objectListing);
collectFileEntry(result, objectListing);
collectFileEntry(dir, result, objectListing);
}
return result.toArray(new FineFileEntry[0]);
}
private void collectFileEntry(List<FineFileEntry> result, ObjectListing objectListing) {
private void collectFileEntry(String dir, List<FineFileEntry> result, ObjectListing objectListing) {
for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
String key = summary.getKey();
if (!key.endsWith(DELIMITER)) {
@ -111,11 +128,13 @@ public class S3ResourceRepository extends BaseResourceRepository {
}
}
for (String prefix : objectListing.getCommonPrefixes()) {
if (StringUtils.isNotEmpty(prefix.substring(dir.length()).replaceAll(DELIMITER, StringUtils.EMPTY))) {
FineFileEntry entry = new FineFileEntry(prefix);
entry.setDirectory(true);
result.add(entry);
}
}
}
@Override
public URL getResource(String path) {
@ -128,6 +147,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
try {
return s3.getObject(request).getObjectContent();
} catch (Exception e) {
LogKit.error("[S3] Failed to read file {}", filePath);
LogKit.error(e.getMessage(), e);
return new ByteArrayInputStream(new byte[0]);
}
}
@ -150,6 +171,61 @@ public class S3ResourceRepository extends BaseResourceRepository {
s3.putObject(bucket, path, new ByteArrayInputStream(data), metadata);
}
@Override
public void write(String path, InputStream inputStream) throws ResourceIOException {
long dataLength = 0;
try {
dataLength = inputStream.available();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
//超过一定大小才使用分片上传,小文件来说,网络传输时间可能较短,且上传失败的风险相对较低。
//在网络稳定的情况下,使用分片上传可能没有太多的优势,反而增加了额外开销和复杂性
if (dataLength > MULTIPART_UPLOAD_LIMIT) {
try {
// Step 1: 初始化分片上传
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, path);
InitiateMultipartUploadResult initResponse = s3.initiateMultipartUpload(initRequest);
String uploadId = initResponse.getUploadId();
// Step 2: 分片上传文件
List<PartETag> partETags = new ArrayList<>();
long position = 0;
for (int partNumber = 1; position < dataLength; partNumber++) {
// 最后一个分片可能小于5MB
long partSizeBytes = Math.min(PART_SIZE, dataLength - position);
byte[] bytes = new byte[(int) partSizeBytes];
inputStream.read(bytes);
// 创建上传请求
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(path)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(new ByteArrayInputStream(bytes))
.withPartSize(partSizeBytes);
// 上传分片
UploadPartResult uploadResult = s3.uploadPart(uploadRequest);
partETags.add(uploadResult.getPartETag());
position += partSizeBytes;
}
// Step 3: 完成分片上传
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucket, path, uploadId, partETags);
s3.completeMultipartUpload(compRequest);
} catch (IOException e) {
throw new ResourceIOException(e);
} finally {
ResourceIOUtils.close(inputStream);
}
} else {
super.write(path, inputStream);
}
}
@Override
public boolean createFile(String path) {
@ -313,7 +389,11 @@ public class S3ResourceRepository extends BaseResourceRepository {
result.add(key.substring(key.lastIndexOf(DELIMITER) + 1));
}
for (String prefix : objectListing.getCommonPrefixes()) {
result.add(prefix.substring(prefix.lastIndexOf(DELIMITER) + 1));
if (StringUtils.isNotEmpty(prefix.substring(dir.length()).replaceAll(DELIMITER, StringUtils.EMPTY))) {
String[] arr = prefix.split(DELIMITER);
String name = arr[arr.length - 1] + DELIMITER;
result.add(name);
}
}
}
@ -341,6 +421,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
S3Object s3Object = s3.getObject(bucket, path);
if (s3Object != null) {
try {
//s3Object要全部读完,否则会有警告
IOUtils.copy(s3Object.getObjectContent(), new NullOutputStream());
return s3Object.getObjectMetadata().getLastModified().getTime();
} finally {
s3Object.close();

2
src/main/resources/com/fanruan/fs/s3/repository/web/js/bundle.js

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save