Browse Source

Pull request #290: REPORT-96529 && REPORT-97765 && REPORT-98184 fix:list接口数量限制;删除接口性能优化;s3插件支持配置连接池大小;测试连接后释放资源;list接口过滤自身目录

Merge in PG/plugin-repository-s3 from ~AFLY/plugin-repository-s3:release/10.0 to release/10.0

* commit '7d4a1f54e3d93e324e13d68aadf3265a853db257':
  无jira任务 fix: 测试连接后释放资源;list接口过滤自身目录
  REPORT-97765 feat: s3插件支持配置连接池大小
  REPORT-97765 feat: s3插件支持配置连接池大小
  REPORT-96529 fix: list接口数量限制;删除接口性能优化
release/10.0
Afly-储泓飞 2 years ago
parent
commit
5f9e9517c9
  1. 15
      src/main/java/com/fanruan/fs/s3/repository/core/S3Config.java
  2. 7
      src/main/java/com/fanruan/fs/s3/repository/core/S3RepositoryFactory.java
  3. 135
      src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java

15
src/main/java/com/fanruan/fs/s3/repository/core/S3Config.java

@ -38,6 +38,9 @@ public class S3Config extends CommonRepoConfig {
@Identifier("signerOverride") @Identifier("signerOverride")
private Conf<String> signerOverride = HolderKit.simple(StringUtils.EMPTY); private Conf<String> signerOverride = HolderKit.simple(StringUtils.EMPTY);
@Identifier("maxConnections")
private Conf<Integer> maxConnections = HolderKit.simple(200);
@GetConfig("endPoint") @GetConfig("endPoint")
public String getEndPoint() { public String getEndPoint() {
return endPoint.get(); return endPoint.get();
@ -98,6 +101,16 @@ public class S3Config extends CommonRepoConfig {
this.signerOverride.set(signerOverride); this.signerOverride.set(signerOverride);
} }
@GetConfig("maxConnections")
public int getMaxConnections() {
return maxConnections.get();
}
@SetConfig("maxConnections")
public void setMaxConnections(int maxConnections) {
this.maxConnections.set(maxConnections);
}
@Override @Override
public void update(String key) { public void update(String key) {
super.update(key); super.update(key);
@ -109,6 +122,7 @@ public class S3Config extends CommonRepoConfig {
this.setBucket(newConfig.getBucket()); this.setBucket(newConfig.getBucket());
this.setEnablePathStyleAccess(newConfig.isEnablePathStyleAccess()); this.setEnablePathStyleAccess(newConfig.isEnablePathStyleAccess());
this.setSignerOverride(newConfig.getSignerOverride()); this.setSignerOverride(newConfig.getSignerOverride());
this.setMaxConnections(newConfig.getMaxConnections());
} }
} }
@ -121,6 +135,7 @@ public class S3Config extends CommonRepoConfig {
cloned.bucket = (Conf<String>) bucket.clone(); cloned.bucket = (Conf<String>) bucket.clone();
cloned.enablePathStyleAccess = (Conf<Boolean>) enablePathStyleAccess.clone(); cloned.enablePathStyleAccess = (Conf<Boolean>) enablePathStyleAccess.clone();
cloned.signerOverride = (Conf<String>) signerOverride.clone(); cloned.signerOverride = (Conf<String>) signerOverride.clone();
cloned.maxConnections = (Conf<Integer>) maxConnections.clone();
return cloned; return cloned;
} }
} }

7
src/main/java/com/fanruan/fs/s3/repository/core/S3RepositoryFactory.java

@ -40,6 +40,7 @@ public class S3RepositoryFactory extends ConfigRepositoryFactory<S3Config> {
@Override @Override
public boolean verifyConfig(S3Config config) { public boolean verifyConfig(S3Config config) {
AmazonS3 s3 = null;
try { try {
BasicAWSCredentials credentials = new BasicAWSCredentials(config.getAccessKeyId(), config.getPassword()); BasicAWSCredentials credentials = new BasicAWSCredentials(config.getAccessKeyId(), config.getPassword());
AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard() AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard()
@ -57,11 +58,15 @@ public class S3RepositoryFactory extends ConfigRepositoryFactory<S3Config> {
clientConfiguration.setProtocol(Protocol.HTTP); clientConfiguration.setProtocol(Protocol.HTTP);
} }
amazonS3ClientBuilder = amazonS3ClientBuilder.withClientConfiguration(clientConfiguration); amazonS3ClientBuilder = amazonS3ClientBuilder.withClientConfiguration(clientConfiguration);
AmazonS3 s3 = amazonS3ClientBuilder.build(); s3 = amazonS3ClientBuilder.build();
s3.listObjects(config.getBucket()); s3.listObjects(config.getBucket());
} catch (Exception e) { } catch (Exception e) {
LogKit.error(e.getMessage(), e); LogKit.error(e.getMessage(), e);
return false; return false;
} finally {
if (s3 != null) {
s3.shutdown();
}
} }
return true; return true;
} }

135
src/main/java/com/fanruan/fs/s3/repository/core/S3ResourceRepository.java

@ -7,6 +7,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
@ -14,12 +15,14 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.util.IOUtils;
import com.fanruan.api.log.LogKit; import com.fanruan.api.log.LogKit;
import com.fanruan.api.util.StringKit; import com.fanruan.api.util.StringKit;
import com.fr.io.repository.FineFileEntry; import com.fr.io.repository.FineFileEntry;
import com.fr.io.repository.base.BaseResourceRepository; import com.fr.io.repository.base.BaseResourceRepository;
import com.fr.stable.Filter; import com.fr.stable.Filter;
import com.fr.stable.StringUtils; import com.fr.stable.StringUtils;
import com.fr.third.org.apache.commons.io.output.NullOutputStream;
import com.fr.workspace.resource.ResourceIOException; import com.fr.workspace.resource.ResourceIOException;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -36,6 +39,8 @@ import java.util.List;
*/ */
public class S3ResourceRepository extends BaseResourceRepository { public class S3ResourceRepository extends BaseResourceRepository {
private static final int PAGE_SIZE = 1000;
private static final String DELIMITER = "/"; private static final String DELIMITER = "/";
public static final String HTTP = "http:"; public static final String HTTP = "http:";
@ -53,6 +58,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
amazonS3ClientBuilder = amazonS3ClientBuilder.enablePathStyleAccess(); amazonS3ClientBuilder = amazonS3ClientBuilder.enablePathStyleAccess();
} }
ClientConfiguration clientConfiguration = new ClientConfiguration(); ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setMaxConnections(config.getMaxConnections());
LogKit.info("Max connections is {}!", clientConfiguration.getMaxConnections());
if (StringUtils.isNotEmpty(config.getSignerOverride())) { if (StringUtils.isNotEmpty(config.getSignerOverride())) {
clientConfiguration.setSignerOverride(config.getSignerOverride()); clientConfiguration.setSignerOverride(config.getSignerOverride());
} }
@ -92,15 +99,15 @@ public class S3ResourceRepository extends BaseResourceRepository {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket) ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket)
.withPrefix(dir).withDelimiter(DELIMITER); .withPrefix(dir).withDelimiter(DELIMITER);
ObjectListing objectListing = s3.listObjects(listObjectsRequest); ObjectListing objectListing = s3.listObjects(listObjectsRequest);
collectFileEntry(result, objectListing); collectFileEntry(dir, result, objectListing);
while (objectListing.isTruncated()) { while (objectListing.isTruncated()) {
objectListing = s3.listNextBatchOfObjects(objectListing); objectListing = s3.listNextBatchOfObjects(objectListing);
collectFileEntry(result, objectListing); collectFileEntry(dir, result, objectListing);
} }
return result.toArray(new FineFileEntry[0]); return result.toArray(new FineFileEntry[0]);
} }
private void collectFileEntry(List<FineFileEntry> result, ObjectListing objectListing) { private void collectFileEntry(String dir, List<FineFileEntry> result, ObjectListing objectListing) {
for (S3ObjectSummary summary : objectListing.getObjectSummaries()) { for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
String key = summary.getKey(); String key = summary.getKey();
if (!key.endsWith(DELIMITER)) { if (!key.endsWith(DELIMITER)) {
@ -108,9 +115,11 @@ public class S3ResourceRepository extends BaseResourceRepository {
} }
} }
for (String prefix : objectListing.getCommonPrefixes()) { for (String prefix : objectListing.getCommonPrefixes()) {
FineFileEntry entry = new FineFileEntry(prefix); if (StringUtils.isNotEmpty(prefix.substring(dir.length()).replaceAll(DELIMITER, StringUtils.EMPTY))) {
entry.setDirectory(true); FineFileEntry entry = new FineFileEntry(prefix);
result.add(entry); entry.setDirectory(true);
result.add(entry);
}
} }
} }
@ -125,6 +134,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
try { try {
return s3.getObject(request).getObjectContent(); return s3.getObject(request).getObjectContent();
} catch (Exception e) { } catch (Exception e) {
LogKit.error("[S3] Failed to read file {}", filePath);
LogKit.error(e.getMessage(), e);
return new ByteArrayInputStream(new byte[0]); return new ByteArrayInputStream(new byte[0]);
} }
} }
@ -193,32 +204,49 @@ public class S3ResourceRepository extends BaseResourceRepository {
@Override @Override
public boolean delete(String path) { public boolean delete(String path) {
s3.deleteObject(bucket, path);
String prefix = path;
if (!path.endsWith(DELIMITER)) {
prefix = path + DELIMITER;
}
try { try {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket) if (isDirectory(path)) {
.withPrefix(prefix).withDelimiter(DELIMITER); if (!path.endsWith(DELIMITER)) {
ObjectListing objectListing = s3.listObjects(listObjectsRequest); path += DELIMITER;
for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
String key = summary.getKey();
if (!key.endsWith(DELIMITER)) {
s3.deleteObject(bucket, key);
} }
} deleteDirectory(path);
for (String pre : objectListing.getCommonPrefixes()) { } else {
delete(pre); deleteFile(path);
} }
} catch (Exception e) { } catch (Exception e) {
LogKit.error(e.getMessage(), e); LogKit.error("[S3] delete {} failed, error message: {}", path, e.getMessage());
return false;
} }
s3.deleteObject(bucket, prefix);
return true; return true;
} }
private void deleteFile(String path) throws Exception {
s3.deleteObject(bucket, path);
}
private void deleteDirectory(String path) throws Exception {
List<String> files = new ArrayList<>();
for (FineFileEntry fineFileEntry : listEntry(path)) {
if (fineFileEntry.isDirectory()) {
deleteDirectory(fineFileEntry.getPath());
} else {
files.add(fineFileEntry.getPath());
}
}
deleteFiles(files);
deleteFile(path);
}
private void deleteFiles(List<String> paths) throws Exception {
//最多只能同时指定1000个key
for (int i = 0; i < paths.size(); i = i + PAGE_SIZE) {
int toIndex = Math.min(i + PAGE_SIZE, paths.size());
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucket)
.withKeys(paths.subList(i, toIndex).toArray(new String[0]));
s3.deleteObjects(deleteObjectsRequest);
}
}
@Override @Override
public boolean exist(String path) { public boolean exist(String path) {
return fileExist(path) || (!path.endsWith(DELIMITER) && dirExist(path)) || isParentPathAbsent(path); return fileExist(path) || (!path.endsWith(DELIMITER) && dirExist(path)) || isParentPathAbsent(path);
@ -273,34 +301,32 @@ public class S3ResourceRepository extends BaseResourceRepository {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket) ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket)
.withPrefix(dir).withDelimiter(DELIMITER); .withPrefix(dir).withDelimiter(DELIMITER);
ObjectListing objectListing = s3.listObjects(listObjectsRequest); ObjectListing objectListing = s3.listObjects(listObjectsRequest);
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { collectFileName(dir, result, objectListing);
while (objectListing.isTruncated()) {
objectListing = s3.listNextBatchOfObjects(objectListing);
collectFileName(dir, result, objectListing);
}
if (filter != null) {
return result.stream().filter(filter::accept).toArray(String[]::new);
}
return result.toArray(new String[0]);
}
private void collectFileName(String dir, List<String> result, ObjectListing objectListing) {
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
String key = objectSummary.getKey(); String key = objectSummary.getKey();
if (StringKit.equals(key, dir)) { if (StringKit.equals(key, dir)) {
continue; continue;
} }
String[] arr = key.split(DELIMITER); result.add(key.substring(key.lastIndexOf(DELIMITER) + 1));
String name = arr[arr.length - 1];
if (filter == null) {
result.add(name);
} else {
if (filter.accept(name)) {
result.add(name);
}
}
} }
for (String prefix : objectListing.getCommonPrefixes()) { for (String prefix : objectListing.getCommonPrefixes()) {
String[] arr = prefix.split(DELIMITER); if (StringUtils.isNotEmpty(prefix.substring(dir.length()).replaceAll(DELIMITER, StringUtils.EMPTY))) {
String name = arr[arr.length - 1] + DELIMITER; String[] arr = prefix.split(DELIMITER);
if (filter == null) { String name = arr[arr.length - 1] + DELIMITER;
result.add(name); result.add(name);
} else {
if (filter.accept(name)) {
result.add(name);
}
} }
} }
return result.toArray(new String[0]);
} }
@Override @Override
@ -327,6 +353,8 @@ public class S3ResourceRepository extends BaseResourceRepository {
S3Object s3Object = s3.getObject(bucket, path); S3Object s3Object = s3.getObject(bucket, path);
if (s3Object != null) { if (s3Object != null) {
try { try {
//s3Object要全部读完,否则会有警告
IOUtils.copy(s3Object.getObjectContent(), new NullOutputStream());
return s3Object.getObjectMetadata().getLastModified().getTime(); return s3Object.getObjectMetadata().getLastModified().getTime();
} finally { } finally {
s3Object.close(); s3Object.close();
@ -406,27 +434,4 @@ public class S3ResourceRepository extends BaseResourceRepository {
return entry; return entry;
} }
/**
* 递归创建父目录的metadata. 比如path是 {@code WEB-INF/reportlets/test/1.cpt}
* 则返回 {@code [WEB-INF/reportlets/test/1.cpt, WEB-INF/reportlets/test/, WEB-INF/reportlets/, WEB-INF/]}
*/
private List<String> getAllNecessaryPath(String path) {
// 获取所有path路径及其所有父路径
List<String> allPath = new ArrayList<>();
allPath.add(path);
int lastIdxOfDelimiter = path.lastIndexOf(DELIMITER);
// 以/结尾的先把末尾的/去掉
if (lastIdxOfDelimiter == path.length() - 1) {
path = path.substring(0, lastIdxOfDelimiter);
lastIdxOfDelimiter = path.lastIndexOf(DELIMITER);
}
while (lastIdxOfDelimiter > 0) {
allPath.add(path.substring(0, lastIdxOfDelimiter + 1));
path = path.substring(0, lastIdxOfDelimiter);
lastIdxOfDelimiter = path.lastIndexOf(DELIMITER);
}
return allPath;
}
} }

Loading…
Cancel
Save