From 1e3ae7b9427ae0c17ea0f4e8d091ed24d2660835 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Mon, 15 May 2023 14:02:30 +0800 Subject: [PATCH] [improv][Resource Center] Implement getResourceFileName in StorageOperator (#14097) --- .../service/impl/ResourcesServiceImpl.java | 2 +- .../dao/utils/TaskCacheUtils.java | 2 +- .../plugin/storage/api/StorageOperate.java | 10 ++++---- .../storage/gcs/GcsStorageOperator.java | 7 +++--- .../storage/gcs/GcsStorageOperatorTest.java | 13 ++++++++-- .../storage/hdfs/HdfsStorageOperator.java | 24 ++++--------------- .../storage/oss/OssStorageOperator.java | 7 +++--- .../storage/oss/OssStorageOperatorTest.java | 13 ++++++++-- .../plugin/storage/s3/S3StorageOperator.java | 24 ++++--------------- .../storage/s3/S3StorageOperatorTest.java | 13 ++++++++-- .../utils/TaskExecutionCheckerUtils.java | 14 +++++------ .../worker/utils/TaskFilesTransferUtils.java | 6 ++--- 12 files changed, 66 insertions(+), 69 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index cd8d2cd1ea..b806421b77 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -1534,7 +1534,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String tenantCode = getTenantCode(user); - String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER"); + String baseFolder = storageOperate.getResourceFullName(tenantCode, "DATA_TRANSFER"); LocalDateTime now = LocalDateTime.now(); now = now.minus(days, ChronoUnit.DAYS); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java index 0b35e58d53..cbac49e827 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java @@ -178,7 +178,7 @@ public class TaskCacheUtils { public static String getValCheckSum(Property fileProperty, TaskExecutionContext context, StorageOperate storageOperate) { String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX; - String resourceCRCWholePath = storageOperate.getResourceFileName(context.getTenantCode(), resourceCRCPath); + String resourceCRCWholePath = storageOperate.getResourceFullName(context.getTenantCode(), resourceCRCPath); String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath); log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath); String crcString = ""; diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java index f000442d74..13e9ea0b40 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java @@ -60,19 +60,19 @@ public interface StorageOperate { boolean mkdir(String tenantCode, String path) throws IOException; /** - * get the path of the resource file + * get the path of the resource file (fullName) * @param tenantCode - * @param fullName + * @param fileName * @return */ - String getResourceFileName(String tenantCode, String fullName); + String getResourceFullName(String tenantCode, String fileName); /** - * get the path of the resource file excluding the base path. + * get the path of the resource file excluding the base path (fileName) * @param fullName * @return */ - String getResourceFileName(String fullName); + String getResourceFileName(String tenantCode, String fullName); /** * get the path of the file diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java index 4a2f386160..2f190a830f 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java @@ -122,7 +122,7 @@ public class GcsStorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String tenantCode, String fileName) { + public String getResourceFullName(String tenantCode, String fileName) { if (fileName.startsWith(FOLDER_SEPARATOR)) { fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); } @@ -130,8 +130,9 @@ public class GcsStorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String fullName) { - return null; + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); } @Override diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java index 6da3384cdd..0cfc583d77 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java @@ -146,11 +146,20 @@ public class GcsStorageOperatorTest { Assertions.assertTrue(isSuccess); } + @Test + public void getResourceFullName() { + final String expectedResourceFullName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFullName = gcsStorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedResourceFullName, resourceFullName); + } + @Test public void getResourceFileName() { - final String expectedResourceFileName = + final String expectedResourceFileName = FILE_NAME_MOCK; + final String resourceFullName = String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); - final String resourceFileName = gcsStorageOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFileName = gcsStorageOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName); Assertions.assertEquals(expectedResourceFileName, resourceFileName); } diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java index 79b925fd56..405c081472 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java @@ -56,7 +56,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.LinkedList; @@ -69,7 +68,6 @@ import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -298,28 +296,14 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String tenantCode, String fullName) { + public String getResourceFullName(String tenantCode, String fullName) { return getHdfsResourceFileName(tenantCode, fullName); } @Override - public String getResourceFileName(String fullName) { - // here is a quick fix here to get fileName. We get the resource upload path and - // get the index of the first appearance of resource upload path. The index is put - // in the start index of the substring function and get the result substring containing - // tenantcode and "resource" directory and the fileName. - // Then we split the result substring - // with "/" and join all elements except the first two elements because they are - // tenantCode and "resource" directory. - String resourceUploadPath = - RESOURCE_UPLOAD_PATH.endsWith(FOLDER_SEPARATOR) ? StringUtils.chop(RESOURCE_UPLOAD_PATH) - : RESOURCE_UPLOAD_PATH; - // +1 because we want to skip the "/" after resource upload path as well. - String pathContainingTenantNResource = fullName.substring( - fullName.indexOf(resourceUploadPath) - + resourceUploadPath.length() + 1); - String[] fileNameArr = pathContainingTenantNResource.split(FOLDER_SEPARATOR); - return Joiner.on(FOLDER_SEPARATOR).join(Arrays.stream(fileNameArr).skip(2).collect(Collectors.toList())); + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); } @Override diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java index be4b158f63..c042014c81 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java @@ -175,7 +175,7 @@ public class OssStorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String tenantCode, String fileName) { + public String getResourceFullName(String tenantCode, String fileName) { if (fileName.startsWith(FOLDER_SEPARATOR)) { fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); } @@ -183,8 +183,9 @@ public class OssStorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String fullName) { - return null; + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); } @Override diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java index 6fc981e092..e0fccd9ad2 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java @@ -158,11 +158,20 @@ public class OssStorageOperatorTest { Assertions.assertTrue(isSuccess); } + @Test + public void getResourceFullName() { + final String expectedResourceFullName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFullName = ossOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedResourceFullName, resourceFullName); + } + @Test public void getResourceFileName() { - final String expectedResourceFileName = + final String expectedResourceFileName = FILE_NAME_MOCK; + final String resourceFullName = String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); - final String resourceFileName = ossOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFileName = ossOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName); Assertions.assertEquals(expectedResourceFileName, resourceFileName); } diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java index ab95187e7b..9ffaffc2ae 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java @@ -46,7 +46,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -76,7 +75,6 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.transfer.MultipleFileDownload; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; -import com.google.common.base.Joiner; @Slf4j @Data @@ -182,7 +180,7 @@ public class S3StorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String tenantCode, String fileName) { + public String getResourceFullName(String tenantCode, String fileName) { if (fileName.startsWith(FOLDER_SEPARATOR)) { fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); } @@ -190,23 +188,9 @@ public class S3StorageOperator implements Closeable, StorageOperate { } @Override - public String getResourceFileName(String fullName) { - // here is a quick fix here to get fileName. We get the resource upload path and - // get the index of the first appearance of resource upload path. The index is put - // in the start index of the substring function and get the result substring containing - // tenantcode and "resource" directory and the fileName. - // Then we split the result substring - // with "/" and join all elements except the first two elements because they are - // tenantCode and "resource" directory. - String resourceUploadPath = - RESOURCE_UPLOAD_PATH.endsWith(FOLDER_SEPARATOR) ? StringUtils.chop(RESOURCE_UPLOAD_PATH) - : RESOURCE_UPLOAD_PATH; - // +1 because we want to skip the "/" after resource upload path as well. - String pathContainingTenantNResource = fullName.substring( - fullName.indexOf(resourceUploadPath) - + resourceUploadPath.length() + 1); - String[] fileNameArr = pathContainingTenantNResource.split(FOLDER_SEPARATOR); - return Joiner.on(FOLDER_SEPARATOR).join(Arrays.stream(fileNameArr).skip(2).collect(Collectors.toList())); + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); } @Override diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java index c94dfd44ec..14541026a2 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java @@ -164,11 +164,20 @@ public class S3StorageOperatorTest { Assertions.assertTrue(isSuccess); } + @Test + public void getResourceFullName() { + final String expectedResourceFullName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFullName = s3StorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedResourceFullName, resourceFullName); + } + @Test public void getResourceFileName() { - final String expectedResourceFileName = + final String expectedResourceFileName = FILE_NAME_MOCK; + final String resourceFullName = String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); - final String resourceFileName = s3StorageOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFileName = s3StorageOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName); Assertions.assertEquals(expectedResourceFileName, resourceFileName); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java index eb77a4182b..1ae02fdd7e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java @@ -109,16 +109,19 @@ public class TaskExecutionCheckerUtils { public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext) { String execLocalPath = taskExecutionContext.getExecutePath(); + String tenant = taskExecutionContext.getTenantCode(); Map projectRes = taskExecutionContext.getResources(); if (MapUtils.isEmpty(projectRes)) { return; } List> downloadFiles = new ArrayList<>(); - projectRes.forEach((key, value) -> { - File resFile = new File(execLocalPath, key); + projectRes.keySet().forEach(fullName -> { + String fileName = storageOperate.getResourceFileName(tenant, fullName); + projectRes.put(fullName, fileName); + File resFile = new File(execLocalPath, fileName); boolean notExist = !resFile.exists(); if (notExist) { - downloadFiles.add(Pair.of(key, value)); + downloadFiles.add(Pair.of(fullName, fileName)); } else { log.info("file : {} exists ", resFile.getName()); } @@ -131,10 +134,7 @@ public class TaskExecutionCheckerUtils { for (Pair fileDownload : downloadFiles) { try { String fullName = fileDownload.getLeft(); - // we do not actually get & need tenantCode with this implementation right now. - String tenantCode = fileDownload.getRight(); - // TODO: Need a better way to get fileName because this implementation is tricky. - String fileName = storageOperate.getResourceFileName(fullName); + String fileName = fileDownload.getRight(); log.info("get resource file from path:{}", fullName); long resourceDownloadStartTime = System.currentTimeMillis(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java index daa2af5db5..1576c65af9 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java @@ -101,9 +101,9 @@ public class TaskFilesTransferUtils { try { // upload file to storage String resourceWholePath = - storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); + storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath); String resourceCRCWholePath = - storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourceCRCPath); + storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath); log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath); storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true); log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath); @@ -176,7 +176,7 @@ public class TaskFilesTransferUtils { try { String resourceWholePath = - storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); + storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath); log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath); storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false, true);