diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 430e84077f..70d4a90ac1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -714,6 +714,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String defaultPath = ""; List resourcesList = new ArrayList<>(); + String resourceStorageType = + PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name()); if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) { // list all tenants' resources to admin users in the root directory @@ -735,7 +737,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe logger.error(e.getMessage() + " Resource path: {}", defaultPath, e); putMsg(result, Status.RESOURCE_NOT_EXIST); throw new ServiceException(String.format(e.getMessage() + - " make sure resource path: %s exists in hdfs", defaultPath)); + " make sure resource path: %s exists in %s", defaultPath, resourceStorageType)); } } } @@ -755,7 +757,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe logger.error(e.getMessage() + " Resource path: {}", fullName, e); putMsg(result, Status.RESOURCE_NOT_EXIST); throw new ServiceException(String.format(e.getMessage() + - " make sure resource path: %s exists in hdfs", defaultPath)); + " make sure resource path: %s exists in %s", defaultPath, resourceStorageType)); } } diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java index f2d2f256cc..dcb7d32567 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java @@ -44,7 +44,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -56,8 +60,13 @@ import org.slf4j.LoggerFactory; import com.aliyun.oss.OSS; import com.aliyun.oss.OSSException; +import com.aliyun.oss.ServiceException; import com.aliyun.oss.model.Bucket; +import com.aliyun.oss.model.DeleteObjectsRequest; +import com.aliyun.oss.model.ListObjectsV2Request; +import com.aliyun.oss.model.ListObjectsV2Result; import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.PutObjectRequest; @@ -190,8 +199,20 @@ public class OssStorageOperator implements Closeable, StorageOperate { } @Override - public boolean delete(String filePath, List childrenPathArray, boolean recursive) throws IOException { - return false; + public boolean delete(String fullName, List childrenPathList, boolean recursive) throws IOException { + // append the resource fullName to the list for deletion. + childrenPathList.add(fullName); + + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName) + .withKeys(childrenPathList); + try { + ossClient.deleteObjects(deleteObjectsRequest); + } catch (Exception e) { + logger.error("delete objects error", e); + return false; + } + + return true; } @Override @@ -288,19 +309,168 @@ public class OssStorageOperator implements Closeable, StorageOperate { @Override public List listFilesStatusRecursively(String path, String defaultPath, String tenantCode, ResourceType type) { - return null; + List storageEntityList = new ArrayList<>(); + LinkedList foldersToFetch = new LinkedList<>(); + + StorageEntity initialEntity = null; + try { + initialEntity = getFileStatus(path, defaultPath, tenantCode, type); + } catch (Exception e) { + logger.error("error while listing files status recursively, path: {}", path, e); + return storageEntityList; + } + foldersToFetch.add(initialEntity); + + while (!foldersToFetch.isEmpty()) { + String pathToExplore = foldersToFetch.pop().getFullName(); + try { + List tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); + for (StorageEntity temp : tempList) { + if (temp.isDirectory()) { + foldersToFetch.add(temp); + } + } + storageEntityList.addAll(tempList); + } catch (Exception e) { + logger.error("error while listing files status recursively, path: {}", pathToExplore, e); + } + } + + return storageEntityList; } @Override public List listFilesStatus(String path, String defaultPath, String tenantCode, ResourceType type) throws Exception { - return null; + List storageEntityList = new ArrayList<>(); + + ListObjectsV2Result result = null; + String nextContinuationToken = null; + do { + try { + ListObjectsV2Request request = new ListObjectsV2Request(); + request.setBucketName(bucketName); + request.setPrefix(path); + request.setDelimiter(FOLDER_SEPARATOR); + request.setContinuationToken(nextContinuationToken); + + result = ossClient.listObjectsV2(request); + } catch (Exception e) { + throw new ServiceException("Get OSS file list exception", e); + } + + List summaries = result.getObjectSummaries(); + + for (OSSObjectSummary summary : summaries) { + if (!summary.getKey().endsWith(FOLDER_SEPARATOR)) { + // the path is a file + String[] aliasArr = summary.getKey().split(FOLDER_SEPARATOR); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, summary.getKey()); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(summary.getKey()); + entity.setDirectory(false); + entity.setDescription(""); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(summary.getSize()); + entity.setCreateTime(summary.getLastModified()); + entity.setUpdateTime(summary.getLastModified()); + entity.setPfullName(path); + + storageEntityList.add(entity); + } + } + + for (String commonPrefix : result.getCommonPrefixes()) { + // the paths in commonPrefix are directories + String suffix = StringUtils.difference(path, commonPrefix); + String fileName = StringUtils.difference(defaultPath, commonPrefix); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(suffix); + entity.setFileName(fileName); + entity.setFullName(commonPrefix); + entity.setDirectory(true); + entity.setDescription(""); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + entity.setCreateTime(null); + entity.setUpdateTime(null); + entity.setPfullName(path); + + storageEntityList.add(entity); + } + + nextContinuationToken = result.getNextContinuationToken(); + } while (result.isTruncated()); + + return storageEntityList; } @Override public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, ResourceType type) throws Exception { - return null; + ListObjectsV2Request request = new ListObjectsV2Request(); + request.setBucketName(bucketName); + request.setPrefix(path); + request.setDelimiter(FOLDER_SEPARATOR); + + ListObjectsV2Result result; + try { + result = ossClient.listObjectsV2(request); + } catch (Exception e) { + throw new ServiceException("Get OSS file list exception", e); + } + + List summaries = result.getObjectSummaries(); + + if (path.endsWith(FOLDER_SEPARATOR)) { + // the path is a directory that may or may not exist in OSS + String alias = findDirAlias(path); + String fileName = StringUtils.difference(defaultPath, path); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(path); + entity.setDirectory(true); + entity.setDescription(""); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + + return entity; + + } else { + // the path is a file + if (summaries.size() > 0) { + OSSObjectSummary summary = summaries.get(0); + String[] aliasArr = summary.getKey().split(FOLDER_SEPARATOR); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, summary.getKey()); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(summary.getKey()); + entity.setDirectory(false); + entity.setDescription(""); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(summary.getSize()); + entity.setCreateTime(summary.getLastModified()); + entity.setUpdateTime(summary.getLastModified()); + + return entity; + } + } + + throw new FileNotFoundException("Object is not found in OSS Bucket: " + bucketName); } @Override @@ -360,4 +530,13 @@ public class OssStorageOperator implements Closeable, StorageOperate { protected OSS buildOssClient() { return OssClientFactory.buildOssClient(ossConnection); } + + private String findDirAlias(String dirPath) { + if (!dirPath.endsWith(FOLDER_SEPARATOR)) { + return dirPath; + } + + Path path = Paths.get(dirPath); + return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR; + } } diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java index eb59c5f667..6fc981e092 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java @@ -27,18 +27,24 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.io.IOException; +import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import com.aliyun.oss.OSS; +import com.aliyun.oss.model.ListObjectsV2Request; +import com.aliyun.oss.model.ListObjectsV2Result; @ExtendWith(MockitoExtension.class) public class OssStorageOperatorTest { @@ -53,6 +59,10 @@ public class OssStorageOperatorTest { private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK"; private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK"; + private static final String FULL_NAME = "/tmp/dir1/"; + + private static final String DEFAULT_PATH = "/tmp/"; + @Mock private OSS ossClientMock; @@ -248,4 +258,34 @@ public class OssStorageOperatorTest { ossOperator.deleteDir(DIR_MOCK); verify(ossClientMock, times(1)).deleteObject(anyString(), anyString()); } + + @Test + public void testGetFileStatus() throws Exception { + doReturn(new ListObjectsV2Result()).when(ossClientMock).listObjectsV2(Mockito.any(ListObjectsV2Request.class)); + StorageEntity entity = ossOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(FULL_NAME, entity.getFullName()); + Assertions.assertEquals("dir1/", entity.getFileName()); + } + + @Test + public void testListFilesStatus() throws Exception { + doReturn(new ListObjectsV2Result()).when(ossClientMock).listObjectsV2(Mockito.any(ListObjectsV2Request.class)); + List result = + ossOperator.listFilesStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(0, result.size()); + } + + @Test + public void testListFilesStatusRecursively() throws Exception { + StorageEntity entity = new StorageEntity(); + entity.setFullName(FULL_NAME); + + doReturn(entity).when(ossOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + doReturn(Collections.EMPTY_LIST).when(ossOperator).listFilesStatus(anyString(), anyString(), anyString(), + Mockito.any(ResourceType.class)); + + List result = + ossOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(0, result.size()); + } } diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java index 81c0d2f7e6..b68ce663a3 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java @@ -43,6 +43,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -409,34 +411,31 @@ public class S3StorageOperator implements Closeable, StorageOperate { public List listFilesStatusRecursively(String path, String defaultPath, String tenantCode, ResourceType type) { List storageEntityList = new ArrayList<>(); - LinkedList foldersToFetch = new LinkedList<>(); - do { - String pathToExplore = ""; - if (foldersToFetch.size() == 0) { - pathToExplore = path; - } else { - pathToExplore = foldersToFetch.pop().getFullName(); - } + StorageEntity initialEntity = null; + try { + initialEntity = getFileStatus(path, defaultPath, tenantCode, type); + } catch (Exception e) { + logger.error("error while listing files status recursively, path: {}", path, e); + return storageEntityList; + } + foldersToFetch.add(initialEntity); + while (!foldersToFetch.isEmpty()) { + String pathToExplore = foldersToFetch.pop().getFullName(); try { List tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); - for (StorageEntity temp : tempList) { if (temp.isDirectory()) { foldersToFetch.add(temp); } } - storageEntityList.addAll(tempList); - } catch (AmazonServiceException e) { - logger.error("Resource path: {}", pathToExplore, e); - // return the resources fetched before error occurs. - return storageEntityList; + } catch (Exception e) { + logger.error("error while listing files status recursively, path: {}", pathToExplore, e); } - - } while (foldersToFetch.size() != 0); + } return storageEntityList; @@ -585,16 +584,12 @@ public class S3StorageOperator implements Closeable, StorageOperate { * a directory is a path ending with "/" */ private String findDirAlias(String myStr) { - if (!myStr.endsWith("/")) { + if (!myStr.endsWith(FOLDER_SEPARATOR)) { // Make sure system won't crush down if someone accidentally misuse the function. return myStr; } - int lastIndex = myStr.lastIndexOf("/"); - String subbedString = myStr.substring(0, lastIndex); - int secondLastIndex = subbedString.lastIndexOf("/"); - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(myStr, secondLastIndex + 1, lastIndex + 1); - return stringBuilder.toString(); + Path path = Paths.get(myStr); + return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR; } } diff --git a/dolphinscheduler-ui/src/locales/zh_CN/resource.ts b/dolphinscheduler-ui/src/locales/zh_CN/resource.ts index 5797c6d57b..cc10612cf3 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/resource.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/resource.ts @@ -24,6 +24,7 @@ export default { enter_keyword_tips: '请输入关键词', name: '名称', user_name: '所属用户', + tenant_name: '所属租户', whether_directory: '是否文件夹', file_name: '文件名称', description: '描述', @@ -58,6 +59,7 @@ export default { upload_udf_resources: '上传UDF资源', udf_source_name: 'UDF资源名称', user_name: '所属用户', + tenant_name: '所属租户', whether_directory: '是否文件夹', file_name: '文件名称', file_size: '文件大小',