From 428f1559a35ee327acb71e8da04e619b480094ea Mon Sep 17 00:00:00 2001 From: Rick Cheng Date: Mon, 22 May 2023 20:44:43 +0800 Subject: [PATCH] [Improv-14083][Resource Center] Handle deleteSource in OSS / S3 / GCS (#14084) --- docs/docs/en/guide/upgrade/incompatible.md | 1 + docs/docs/zh/guide/upgrade/incompatible.md | 1 + .../api/service/impl/ResourcesServiceImpl.java | 2 +- .../dolphinscheduler/dao/utils/TaskCacheUtils.java | 3 +-- .../plugin/storage/api/StorageOperate.java | 3 +-- .../plugin/storage/gcs/GcsStorageOperator.java | 13 ++++++++++--- .../plugin/storage/gcs/GcsStorageOperatorTest.java | 2 -- .../plugin/storage/hdfs/HdfsStorageOperator.java | 4 ++-- .../plugin/storage/oss/OssStorageOperator.java | 11 ++++++++--- .../plugin/storage/oss/OssStorageOperatorTest.java | 2 -- .../plugin/storage/s3/S3StorageOperator.java | 10 ++++++++-- .../plugin/storage/s3/S3StorageOperatorTest.java | 2 -- .../worker/utils/TaskExecutionCheckerUtils.java | 3 +-- .../server/worker/utils/TaskFilesTransferUtils.java | 3 +-- 14 files changed, 35 insertions(+), 25 deletions(-) diff --git a/docs/docs/en/guide/upgrade/incompatible.md b/docs/docs/en/guide/upgrade/incompatible.md index c3106631af..aacaca8c78 100644 --- a/docs/docs/en/guide/upgrade/incompatible.md +++ b/docs/docs/en/guide/upgrade/incompatible.md @@ -7,6 +7,7 @@ This document records the incompatible updates between each version. You need to * Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378)) * Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)). * Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)). +* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084)) ## 3.0.0 diff --git a/docs/docs/zh/guide/upgrade/incompatible.md b/docs/docs/zh/guide/upgrade/incompatible.md index 119f77e2ef..89207000fb 100644 --- a/docs/docs/zh/guide/upgrade/incompatible.md +++ b/docs/docs/zh/guide/upgrade/incompatible.md @@ -7,6 +7,7 @@ * 更新了SQL任务中用于匹配变量的正则表达式 ([#13378](https://github.com/apache/dolphinscheduler/pull/13378)) * Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)). * Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)). +* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084)) ## 3.0.0 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index b806421b77..27ed5ced4c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -1462,7 +1462,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe log.info("Resource path is {}, download local filename is {}", alias, localFileName); try { - storageOperate.download(tenantCode, fullName, localFileName, false, true); + storageOperate.download(tenantCode, fullName, localFileName, true); return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName); } catch (IOException e) { log.error("Download resource error, the path is {}, and local filename is {}, the error message is {}", diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java index cbac49e827..838a77808f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java @@ -183,8 +183,7 @@ public class TaskCacheUtils { log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath); String crcString = ""; try { - storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, false, - true); + storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, true); crcString = FileUtils.readFile2Str(new FileInputStream(targetPath)); fileProperty.setValue(crcString); } catch (IOException e) { diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java index 13e9ea0b40..5934323afc 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java @@ -140,11 +140,10 @@ public interface StorageOperate { * @param tenantCode * @param srcFilePath the full path of the srcPath * @param dstFile - * @param deleteSource * @param overwrite * @throws IOException */ - void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, + void download(String tenantCode, String srcFilePath, String dstFile, boolean overwrite) throws IOException; /** diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java index 2f190a830f..e5743836b3 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java @@ -144,7 +144,7 @@ public class GcsStorageOperator implements Closeable, StorageOperate { } @Override - public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource, + public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean overwrite) throws IOException { File dstFile = new File(dstFilePath); if (dstFile.isDirectory()) { @@ -201,7 +201,9 @@ public class GcsStorageOperator implements Closeable, StorageOperate { .setTarget(target) .build()); - gcsStorage.delete(source); + if (deleteSource) { + gcsStorage.delete(source); + } return true; } @@ -212,7 +214,12 @@ public class GcsStorageOperator implements Closeable, StorageOperate { BlobInfo blobInfo = BlobInfo.newBuilder( BlobId.of(bucketName, dstPath)).build(); - gcsStorage.create(blobInfo, Files.readAllBytes(Paths.get(srcFile))); + Path srcPath = Paths.get(srcFile); + gcsStorage.create(blobInfo, Files.readAllBytes(srcPath)); + + if (deleteSource) { + Files.delete(srcPath); + } return true; } catch (Exception e) { log.error("upload failed,the bucketName is {},the filePath is {}", bucketName, dstPath); diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java index 0cfc583d77..eecde956d7 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java @@ -203,7 +203,6 @@ public class GcsStorageOperatorTest { public void copy() { boolean isSuccess = false; doReturn(null).when(gcsStorage).copy(Mockito.any()); - doReturn(true).when(gcsStorage).delete(Mockito.any(BlobId.class)); try { isSuccess = gcsStorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); } catch (IOException e) { @@ -212,7 +211,6 @@ public class GcsStorageOperatorTest { Assertions.assertTrue(isSuccess); verify(gcsStorage, times(1)).copy(Mockito.any()); - verify(gcsStorage, times(1)).delete(Mockito.any(BlobId.class)); } @Test diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java index 9c3bb73e14..70cf709eb0 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java @@ -312,9 +312,9 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { } @Override - public void download(String tenantCode, String srcHdfsFilePath, String dstFile, boolean deleteSource, + public void download(String tenantCode, String srcHdfsFilePath, String dstFile, boolean overwrite) throws IOException { - copyHdfsToLocal(srcHdfsFilePath, dstFile, deleteSource, overwrite); + copyHdfsToLocal(srcHdfsFilePath, dstFile, false, overwrite); } /** diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java index c042014c81..260d4c7b59 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java @@ -214,7 +214,7 @@ public class OssStorageOperator implements Closeable, StorageOperate { } @Override - public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource, + public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean overwrite) throws IOException { File dstFile = new File(dstFilePath); if (dstFile.isDirectory()) { @@ -234,7 +234,7 @@ public class OssStorageOperator implements Closeable, StorageOperate { } catch (OSSException e) { throw new IOException(e); } catch (FileNotFoundException e) { - log.error("cannot fin the destination file {}", dstFilePath); + log.error("cannot find the destination file {}", dstFilePath); throw e; } } @@ -258,7 +258,9 @@ public class OssStorageOperator implements Closeable, StorageOperate { @Override public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException { ossClient.copyObject(bucketName, srcPath, bucketName, dstPath); - ossClient.deleteObject(bucketName, srcPath); + if (deleteSource) { + ossClient.deleteObject(bucketName, srcPath); + } return true; } @@ -281,6 +283,9 @@ public class OssStorageOperator implements Closeable, StorageOperate { boolean overwrite) throws IOException { try { ossClient.putObject(bucketName, dstPath, new File(srcFile)); + if (deleteSource) { + Files.delete(Paths.get(srcFile)); + } return true; } catch (OSSException e) { log.error("upload failed, the bucketName is {}, the filePath is {}", bucketName, dstPath, e); diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java index e0fccd9ad2..b84df0a024 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java @@ -215,7 +215,6 @@ public class OssStorageOperatorTest { public void copy() { boolean isSuccess = false; doReturn(null).when(ossClientMock).copyObject(anyString(), anyString(), anyString(), anyString()); - doReturn(null).when(ossClientMock).deleteObject(anyString(), anyString()); try { isSuccess = ossOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); } catch (IOException e) { @@ -224,7 +223,6 @@ public class OssStorageOperatorTest { Assertions.assertTrue(isSuccess); verify(ossClientMock, times(1)).copyObject(anyString(), anyString(), anyString(), anyString()); - verify(ossClientMock, times(1)).deleteObject(anyString(), anyString()); } @Test diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java index 9ffaffc2ae..781119c237 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java @@ -202,7 +202,7 @@ public class S3StorageOperator implements Closeable, StorageOperate { } @Override - public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource, + public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean overwrite) throws IOException { File dstFile = new File(dstFilePath); if (dstFile.isDirectory()) { @@ -263,7 +263,9 @@ public class S3StorageOperator implements Closeable, StorageOperate { @Override public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException { s3Client.copyObject(bucketName, srcPath, bucketName, dstPath); - s3Client.deleteObject(bucketName, srcPath); + if (deleteSource) { + s3Client.deleteObject(bucketName, srcPath); + } return true; } @@ -287,6 +289,10 @@ public class S3StorageOperator implements Closeable, StorageOperate { boolean overwrite) throws IOException { try { s3Client.putObject(bucketName, dstPath, new File(srcFile)); + + if (deleteSource) { + Files.delete(Paths.get(srcFile)); + } return true; } catch (AmazonServiceException e) { log.error("upload failed,the bucketName is {},the filePath is {}", bucketName, dstPath); diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java index 14541026a2..0c3f75d861 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java @@ -219,7 +219,6 @@ public class S3StorageOperatorTest { public void copy() { boolean isSuccess = false; doReturn(null).when(s3Client).copyObject(anyString(), anyString(), anyString(), anyString()); - doNothing().when(s3Client).deleteObject(anyString(), anyString()); try { isSuccess = s3StorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); } catch (IOException e) { @@ -228,7 +227,6 @@ public class S3StorageOperatorTest { Assertions.assertTrue(isSuccess); verify(s3Client, times(1)).copyObject(anyString(), anyString(), anyString(), anyString()); - verify(s3Client, times(1)).deleteObject(anyString(), anyString()); } @Test diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java index 741d0d568c..99d9443ef4 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java @@ -142,8 +142,7 @@ public class TaskExecutionCheckerUtils { long resourceDownloadStartTime = System.currentTimeMillis(); storageOperate.download(actualTenant, fullName, - execLocalPath + File.separator + fileName, false, - true); + execLocalPath + File.separator + fileName, true); WorkerServerMetrics .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime); WorkerServerMetrics.recordWorkerResourceDownloadSize( diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java index 1576c65af9..e591b3610e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java @@ -178,8 +178,7 @@ public class TaskFilesTransferUtils { String resourceWholePath = storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath); log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath); - storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false, - true); + storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, true); } catch (IOException ex) { throw new TaskException("Download file from storage error", ex); }