diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml b/deploy/kubernetes/dolphinscheduler/values.yaml index cc17f54055..27eaab94f2 100644 --- a/deploy/kubernetes/dolphinscheduler/values.yaml +++ b/deploy/kubernetes/dolphinscheduler/values.yaml @@ -104,7 +104,7 @@ conf: # user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path: /tmp/dolphinscheduler - # resource storage type: HDFS, S3, OSS, GCS, NONE + # resource storage type: HDFS, S3, OSS, GCS, ABS, NONE resource.storage.type: S3 # resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 4a03a58139..5751e027f5 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -197,7 +197,7 @@ The default configuration is as follows: | Parameters | Default value | Description | |--|--|--| |data.basedir.path | /tmp/dolphinscheduler | local directory used to store temp files| -|resource.storage.type | NONE | type of resource files: HDFS, S3, OSS, GCS, NONE| +|resource.storage.type | NONE | type of resource files: HDFS, S3, OSS, GCS, ABS, NONE| |resource.upload.path | /dolphinscheduler | storage path of resource files| |aws.access.key.id | minioadmin | access key id of S3| |aws.secret.access.key | minioadmin | secret access key of S3| diff --git a/docs/docs/en/guide/installation/kubernetes.md b/docs/docs/en/guide/installation/kubernetes.md index c2323b51af..fcf2833ca8 100644 --- a/docs/docs/en/guide/installation/kubernetes.md +++ b/docs/docs/en/guide/installation/kubernetes.md @@ -553,7 +553,7 @@ common: | | | | | `common.configmap.DOLPHINSCHEDULER_OPTS` | The jvm options for dolphinscheduler, suitable for all servers | `""` | | `common.configmap.DATA_BASEDIR_PATH` | User data directory path, self configuration, please make sure the directory exists and have read write permissions | `/tmp/dolphinscheduler` | -| `common.configmap.RESOURCE_STORAGE_TYPE` | Resource storage type: HDFS, S3, OSS, GCS, NONE | `HDFS` | +| `common.configmap.RESOURCE_STORAGE_TYPE` | Resource storage type: HDFS, S3, OSS, GCS, ABS, NONE | `HDFS` | | `common.configmap.RESOURCE_UPLOAD_PATH` | Resource store on HDFS/S3 path, please make sure the directory exists on hdfs and have read write permissions | `/dolphinscheduler` | | `common.configmap.FS_DEFAULT_FS` | Resource storage file system like `file:///`, `hdfs://mycluster:8020` or `s3a://dolphinscheduler` | `file:///` | | `common.configmap.FS_S3A_ENDPOINT` | S3 endpoint when `common.configmap.RESOURCE_STORAGE_TYPE` is set to `S3` | `s3.xxx.amazonaws.com` | diff --git a/docs/docs/en/guide/remote-logging.md b/docs/docs/en/guide/remote-logging.md index 421b4b50a0..a29dc06582 100644 --- a/docs/docs/en/guide/remote-logging.md +++ b/docs/docs/en/guide/remote-logging.md @@ -61,3 +61,19 @@ remote.logging.google.cloud.storage.credential=/path/to/credential remote.logging.google.cloud.storage.bucket.name= ``` +## Writing task logs to [Azure Blob Storage (ABS)](https://azure.microsoft.com/en-us/products/storage/blobs) + +Configure `common.properties` as follows: + +```properties +# abs container name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.container.name= +# abs account name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.account.name= +# abs connection string, required if you set resource.storage.type=ABS +resource.azure.blob.storage.connection.string= +``` + +### Notice + +Since Azure Blob Storage does not support the existence of empty directories, there will be empty files `` under the resource directory. But it does not affect the file display on the Dolphinscheduler resource center. diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index 9b756b7736..d8e13770b2 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -80,7 +80,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: HDFS, S3, OSS, GCS, NONE +# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE resource.storage.type=NONE # resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/tmp/dolphinscheduler diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 9ae5e7bfab..be66fe99fd 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -194,7 +194,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId | 参数 | 默认值 | 描述 | |--|--|--| |data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件| -|resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,OSS,GCS,NONE| +|resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,OSS,GCS,ABS,NONE| |resource.upload.path | /dolphinscheduler | 资源文件存储路径| |aws.access.key.id | minioadmin | S3 access key| |aws.secret.access.key | minioadmin | S3 secret access key| diff --git a/docs/docs/zh/guide/installation/kubernetes.md b/docs/docs/zh/guide/installation/kubernetes.md index a5c66e3a4b..e8cd6c70b5 100644 --- a/docs/docs/zh/guide/installation/kubernetes.md +++ b/docs/docs/zh/guide/installation/kubernetes.md @@ -552,7 +552,7 @@ common: | | | | | `common.configmap.DOLPHINSCHEDULER_OPTS` | The jvm options for dolphinscheduler, suitable for all servers | `""` | | `common.configmap.DATA_BASEDIR_PATH` | User data directory path, self configuration, please make sure the directory exists and have read write permissions | `/tmp/dolphinscheduler` | -| `common.configmap.RESOURCE_STORAGE_TYPE` | Resource storage type: HDFS, S3, OSS, GCS, NONE | `HDFS` | +| `common.configmap.RESOURCE_STORAGE_TYPE` | Resource storage type: HDFS, S3, OSS, GCS, ABS, NONE | `HDFS` | | `common.configmap.RESOURCE_UPLOAD_PATH` | Resource store on HDFS/S3 path, please make sure the directory exists on hdfs and have read write permissions | `/dolphinscheduler` | | `common.configmap.FS_DEFAULT_FS` | Resource storage file system like `file:///`, `hdfs://mycluster:8020` or `s3a://dolphinscheduler` | `file:///` | | `common.configmap.FS_S3A_ENDPOINT` | S3 endpoint when `common.configmap.RESOURCE_STORAGE_TYPE` is set to `S3` | `s3.xxx.amazonaws.com` | diff --git a/docs/docs/zh/guide/remote-logging.md b/docs/docs/zh/guide/remote-logging.md index 86e7f8219f..7321badb1a 100644 --- a/docs/docs/zh/guide/remote-logging.md +++ b/docs/docs/zh/guide/remote-logging.md @@ -61,3 +61,19 @@ remote.logging.google.cloud.storage.credential=/path/to/credential remote.logging.google.cloud.storage.bucket.name= ``` +## 将任务日志写入[Azure Blob Storage (ABS)](https://azure.microsoft.com/en-us/products/storage/blobs) + +配置`common.propertis`如下: + +```properties +# abs container name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.container.name= +# abs account name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.account.name= +# abs connection string, required if you set resource.storage.type=ABS +resource.azure.blob.storage.connection.string= +``` + +### 注意事项 + +由于Azure Blob Storage不支持空目录单独存在,因此资源目录下会有空文件``。但是并不影响Dolphinscheduler资源中心上的文件展示。 diff --git a/docs/docs/zh/guide/resource/configuration.md b/docs/docs/zh/guide/resource/configuration.md index 7bd152c2f7..43ee13966e 100644 --- a/docs/docs/zh/guide/resource/configuration.md +++ b/docs/docs/zh/guide/resource/configuration.md @@ -79,7 +79,7 @@ resource.aws.s3.endpoint= # user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path=/tmp/dolphinscheduler -# resource storage type: LOCAL, HDFS, S3, OSS, GCS +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS resource.storage.type=LOCAL # resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration, diff --git a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/file-manage/common.properties b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/file-manage/common.properties index 3f6097402e..000341f153 100644 --- a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/file-manage/common.properties +++ b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/file-manage/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: HDFS, S3, OSS, GCS, NONE +# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE resource.storage.type=S3 # resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/dolphinscheduler diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index cb3be7c661..fd5764e2e1 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -113,6 +113,7 @@ 2.8.0 6.0.0 1.6.0 + 1.2.10 @@ -700,6 +701,14 @@ ${google-cloud-storage.version} + + com.azure + azure-sdk-bom + ${azure-sdk-bom.version} + pom + import + + joda-time joda-time diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index dc919429c2..8291e55f41 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -141,6 +141,12 @@ public final class Constants { public static final String GOOGLE_CLOUD_STORAGE_CREDENTIAL = "resource.google.cloud.storage.credential"; + public static final String AZURE_BLOB_STORAGE_CONNECTION_STRING = "resource.azure.blob.storage.connection.string"; + + public static final String AZURE_BLOB_STORAGE_CONTAINER_NAME = "resource.azure.blob.storage.container.name"; + + public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME = "resource.azure.blob.storage.account.name"; + /** * fetch applicationId way */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java index e7cb75367c..2ebc2e1a5c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java @@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums; * data base types */ public enum ResUploadType { - LOCAL, HDFS, S3, OSS, GCS, NONE + LOCAL, HDFS, S3, OSS, GCS, ABS, NONE } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 90c4d82ded..e7c119197e 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL @@ -66,6 +66,13 @@ resource.google.cloud.storage.credential=/path/to/credential # gcs bucket name, required if you set resource.storage.type=GCS resource.google.cloud.storage.bucket.name= +# abs container name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.container.name= +# abs account name, required if you set resource.storage.type=ABS +resource.azure.blob.storage.account.name= +# abs connection string, required if you set resource.storage.type=ABS +resource.azure.blob.storage.connection.string= + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index cf71d62cee..e3b49541d7 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -652,8 +652,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt. bcprov-jdk15on 1.68: https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on, MIT reactive-streams 1.0.4: https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams/1.0.4, MIT webjars-locator-core 0.50: https://mvnrepository.com/artifact/org.webjars/webjars-locator-core/0.50, MIT - azure-core 1.34.0: https://mvnrepository.com/artifact/com.azure/azure-core/1.34.0, MIT - azure-core-http-netty 1.12.7: https://mvnrepository.com/artifact/com.azure/azure-core-http-netty/1.12.7, MIT + azure-core 1.36.0: https://mvnrepository.com/artifact/com.azure/azure-core/1.36.0, MIT + azure-core-http-netty 1.13.0: https://mvnrepository.com/artifact/com.azure/azure-core-http-netty/1.13.0, MIT azure-identity 1.7.1: https://mvnrepository.com/artifact/com.azure/azure-identity/1.7.1, MIT msal4j 1.13.3: https://mvnrepository.com/artifact/com.microsoft.azure/msal4j/1.13.3, MIT msal4j-persistence-extension 1.1.0: https://mvnrepository.com/artifact/com.microsoft.azure/msal4j-persistence-extension/1.1.0, MIT @@ -682,11 +682,13 @@ The text of each license is also included at licenses/LICENSE-[project].txt. azure-resourcemanager-sql 2.21.0: https://mvnrepository.com/artifact/com.azure.resourcemanager/azure-resourcemanager-sql/2.21.0, MIT azure-resourcemanager-storage 2.21.0: https://mvnrepository.com/artifact/com.azure.resourcemanager/azure-resourcemanager-storage/2.21.0, MIT azure-resourcemanager-trafficmanager 2.21.0: https://mvnrepository.com/artifact/com.azure.resourcemanager/azure-resourcemanager-trafficmanager/2.21.0, MIT - azure-security-keyvault-keys 4.5.2: https://mvnrepository.com/artifact/com.azure/azure-security-keyvault-keys/4.5.2, MIT - azure-security-keyvault-secrets 4.5.2: https://mvnrepository.com/artifact/com.azure/azure-security-keyvault-secrets/4.5.2, MIT - azure-storage-common 12.19.1: https://mvnrepository.com/artifact/com.azure/azure-storage-common/12.19.1, MIT - azure-storage-file-share 12.16.1: https://mvnrepository.com/artifact/com.azure/azure-storage-file-share/12.16.1, MIT - azure-core-management 1.9.0: https://mvnrepository.com/artifact/com.azure/azure-core-management/1.9.0, MIT + azure-security-keyvault-keys 4.5.4: https://mvnrepository.com/artifact/com.azure/azure-security-keyvault-keys/4.5.4, MIT + azure-security-keyvault-secrets 4.5.4: https://mvnrepository.com/artifact/com.azure/azure-security-keyvault-secrets/4.5.4, MIT + azure-storage-common 12.20.0: https://mvnrepository.com/artifact/com.azure/azure-storage-common/12.20.0, MIT + azure-storage-file-share 12.17.0: https://mvnrepository.com/artifact/com.azure/azure-storage-file-share/12.17.0, MIT + azure-core-management 1.10.1: https://mvnrepository.com/artifact/com.azure/azure-core-management/1.10.1, MIT + azure-storage-blob 12.21.0: https://mvnrepository.com/artifact/com.azure/azure-storage-blob/12.21.0, MIT + azure-storage-internal-avro 12.6.0: https://mvnrepository.com/artifact/com.azure/azure-storage-internal-avro/12.6.0, MIT ======================================================================== MPL 1.1 licenses diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-blob.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-blob.txt new file mode 100644 index 0000000000..5e7b21f02e --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-blob.txt @@ -0,0 +1,14 @@ +Copyright(c) 2016 Microsoft Corporation +All rights reserved. + +MIT License +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), +to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, +and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. \ No newline at end of file diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-internal-avro.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-internal-avro.txt new file mode 100644 index 0000000000..5e7b21f02e --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-azure-storage-internal-avro.txt @@ -0,0 +1,14 @@ +Copyright(c) 2016 Microsoft Corporation +All rights reserved. + +MIT License +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), +to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, +and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. \ No newline at end of file diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties index 7303547d7a..f704bb60fc 100644 --- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties +++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: HDFS, S3, OSS, GCS, NONE +# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE resource.storage.type=S3 # resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/dolphinscheduler diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/pom.xml new file mode 100644 index 0000000000..e35c6d10fd --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-storage-plugin + dev-SNAPSHOT + + + dolphinscheduler-storage-abs + + + + com.azure + azure-storage-blob + + + + org.apache.dolphinscheduler + dolphinscheduler-storage-api + ${project.version} + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperator.java new file mode 100644 index 0000000000..c95ebed2ff --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperator.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.abs; + +import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; +import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; +import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE; +import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import com.azure.core.http.rest.PagedIterable; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobContainerItem; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.specialized.BlockBlobClient; + +@Data +@Slf4j +public class AbsStorageOperator implements Closeable, StorageOperate { + + private BlobContainerClient blobContainerClient; + + private BlobServiceClient blobServiceClient; + + private String connectionString; + + private String storageAccountName; + + private String containerName; + + public AbsStorageOperator() { + + } + + public void init() { + containerName = readContainerName(); + connectionString = readConnectionString(); + storageAccountName = readAccountName(); + blobServiceClient = buildBlobServiceClient(); + blobContainerClient = buildBlobContainerClient(); + checkContainerNameExists(); + } + + protected BlobServiceClient buildBlobServiceClient() { + return new BlobServiceClientBuilder() + .endpoint("https://" + storageAccountName + ".blob.core.windows.net/") + .connectionString(connectionString) + .buildClient(); + } + + protected BlobContainerClient buildBlobContainerClient() { + return blobServiceClient.getBlobContainerClient(containerName); + } + + protected String readConnectionString() { + return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_CONNECTION_STRING); + } + + protected String readContainerName() { + return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_CONTAINER_NAME); + } + + protected String readAccountName() { + return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_ACCOUNT_NAME); + } + + @Override + public void createTenantDirIfNotExists(String tenantCode) throws Exception { + mkdir(tenantCode, getAbsResDir(tenantCode)); + mkdir(tenantCode, getAbsUdfDir(tenantCode)); + } + + public String getAbsResDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_FILE, getAbsTenantDir(tenantCode)); + } + + public String getAbsUdfDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_UDF, getAbsTenantDir(tenantCode)); + } + + public String getAbsTenantDir(String tenantCode) { + return String.format(FORMAT_S_S, getGcsDataBasePath(), tenantCode); + } + + public String getGcsDataBasePath() { + if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) { + return EMPTY_STRING; + } else { + return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); + } + } + + @Override + public String getResDir(String tenantCode) { + return getAbsResDir(tenantCode) + FOLDER_SEPARATOR; + } + + @Override + public String getUdfDir(String tenantCode) { + return getAbsUdfDir(tenantCode) + FOLDER_SEPARATOR; + } + + @Override + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); + } + + @Override + public String getResourceFullName(String tenantCode, String fileName) { + if (fileName.startsWith(FOLDER_SEPARATOR)) { + fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); + } + return String.format(FORMAT_S_S, getAbsResDir(tenantCode), fileName); + } + + @Override + public String getFileName(ResourceType resourceType, String tenantCode, String fileName) { + if (fileName.startsWith(FOLDER_SEPARATOR)) { + fileName = fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); + } + return getDir(resourceType, tenantCode) + fileName; + } + + @Override + public void download(String tenantCode, String srcFilePath, String dstFilePath, + boolean overwrite) throws IOException { + File dstFile = new File(dstFilePath); + if (dstFile.isDirectory()) { + Files.delete(dstFile.toPath()); + } else { + Files.createDirectories(dstFile.getParentFile().toPath()); + } + + BlobClient blobClient = blobContainerClient.getBlobClient(srcFilePath); + blobClient.downloadToFile(dstFilePath, true); + } + + @Override + public boolean exists(String fullName) throws IOException { + return isObjectExists(fullName); + } + + protected boolean isObjectExists(String objectName) { + return blobContainerClient.getBlobClient(objectName).exists(); + } + + @Override + public boolean delete(String filePath, boolean recursive) throws IOException { + try { + if (isObjectExists(filePath)) { + blobContainerClient.getBlobClient(filePath).delete(); + } + return true; + } catch (Exception e) { + log.error("delete the object error,the resource path is {}", filePath); + return false; + } + } + + @Override + public boolean delete(String fullName, List childrenPathList, boolean recursive) throws IOException { + // append the resource fullName to the list for deletion. + childrenPathList.add(fullName); + + boolean result = true; + for (String filePath : childrenPathList) { + if (!delete(filePath, recursive)) { + result = false; + } + } + + return result; + } + + @Override + public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException { + BlobClient srcBlobClient = blobContainerClient.getBlobClient(srcPath); + BlockBlobClient dstBlobClient = blobContainerClient.getBlobClient(dstPath).getBlockBlobClient(); + + dstBlobClient.uploadFromUrl(srcBlobClient.getBlobUrl(), overwrite); + + if (deleteSource) { + srcBlobClient.delete(); + } + return true; + } + + @Override + public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, + boolean overwrite) throws IOException { + try { + BlobClient blobClient = blobContainerClient.getBlobClient(dstPath); + blobClient.uploadFromFile(srcFile, overwrite); + + Path srcPath = Paths.get(srcFile); + if (deleteSource) { + Files.delete(srcPath); + } + return true; + } catch (Exception e) { + log.error("upload failed,the container is {},the filePath is {}", containerName, dstPath); + return false; + } + } + + @Override + public List vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException { + if (StringUtils.isBlank(filePath)) { + log.error("file path:{} is blank", filePath); + return Collections.emptyList(); + } + + BlobClient blobClient = blobContainerClient.getBlobClient(filePath); + try ( + BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader( + new ByteArrayInputStream(blobClient.downloadContent().toBytes())))) { + Stream stream = bufferedReader.lines().skip(skipLineNums).limit(limit); + return stream.collect(Collectors.toList()); + } + } + + @Override + public void deleteTenant(String tenantCode) throws Exception { + deleteTenantCode(tenantCode); + } + + protected void deleteTenantCode(String tenantCode) { + deleteDirectory(getResDir(tenantCode)); + deleteDirectory(getUdfDir(tenantCode)); + } + + @Override + public String getDir(ResourceType resourceType, String tenantCode) { + switch (resourceType) { + case UDF: + return getUdfDir(tenantCode); + case FILE: + return getResDir(tenantCode); + case ALL: + return getGcsDataBasePath(); + default: + return EMPTY_STRING; + } + + } + + protected void deleteDirectory(String directoryName) { + if (isObjectExists(directoryName)) { + blobContainerClient.getBlobClient(directoryName).delete(); + } + } + + @Override + public boolean mkdir(String tenantCode, String path) throws IOException { + String objectName = path + FOLDER_SEPARATOR; + if (!isObjectExists(objectName)) { + BlobClient blobClient = blobContainerClient.getBlobClient(objectName); + blobClient.upload(new ByteArrayInputStream(EMPTY_STRING.getBytes()), 0); + } + return true; + } + + @Override + public void close() throws IOException { + + } + + @Override + public ResUploadType returnStorageType() { + return ResUploadType.ABS; + } + + @Override + public List listFilesStatusRecursively(String path, String defaultPath, String tenantCode, + ResourceType type) { + List storageEntityList = new ArrayList<>(); + LinkedList foldersToFetch = new LinkedList<>(); + + StorageEntity initialEntity = null; + try { + initialEntity = getFileStatus(path, defaultPath, tenantCode, type); + } catch (Exception e) { + log.error("error while listing files status recursively, path: {}", path, e); + return storageEntityList; + } + foldersToFetch.add(initialEntity); + + while (!foldersToFetch.isEmpty()) { + String pathToExplore = foldersToFetch.pop().getFullName(); + try { + List tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); + for (StorageEntity temp : tempList) { + if (temp.isDirectory()) { + foldersToFetch.add(temp); + } + } + storageEntityList.addAll(tempList); + } catch (Exception e) { + log.error("error while listing files stat:wus recursively, path: {}", pathToExplore, e); + } + } + + return storageEntityList; + } + + @Override + public List listFilesStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + List storageEntityList = new ArrayList<>(); + + PagedIterable blobItems; + blobItems = blobContainerClient.listBlobsByHierarchy(path); + if (blobItems == null) { + return storageEntityList; + } + + for (BlobItem blobItem : blobItems) { + if (path.equals(blobItem.getName())) { + continue; + } + if (blobItem.isPrefix()) { + String suffix = StringUtils.difference(path, blobItem.getName()); + String fileName = StringUtils.difference(defaultPath, blobItem.getName()); + StorageEntity entity = new StorageEntity(); + entity.setAlias(suffix); + entity.setFileName(fileName); + entity.setFullName(blobItem.getName()); + entity.setDirectory(true); + entity.setDescription(EMPTY_STRING); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + entity.setCreateTime(null); + entity.setUpdateTime(null); + entity.setPfullName(path); + + storageEntityList.add(entity); + } else { + String[] aliasArr = blobItem.getName().split("/"); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, blobItem.getName()); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(blobItem.getName()); + entity.setDirectory(false); + entity.setDescription(EMPTY_STRING); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(blobItem.getProperties().getContentLength()); + entity.setCreateTime(Date.from(blobItem.getProperties().getCreationTime().toInstant())); + entity.setUpdateTime(Date.from(blobItem.getProperties().getLastModified().toInstant())); + entity.setPfullName(path); + + storageEntityList.add(entity); + } + } + + return storageEntityList; + } + + @Override + public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + if (path.endsWith(FOLDER_SEPARATOR)) { + // the path is a directory that may or may not exist + String alias = findDirAlias(path); + String fileName = StringUtils.difference(defaultPath, path); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(path); + entity.setDirectory(true); + entity.setDescription(EMPTY_STRING); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + + return entity; + } else { + if (isObjectExists(path)) { + BlobClient blobClient = blobContainerClient.getBlobClient(path); + + String[] aliasArr = blobClient.getBlobName().split(FOLDER_SEPARATOR); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, blobClient.getBlobName()); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(blobClient.getBlobName()); + entity.setDirectory(false); + entity.setDescription(EMPTY_STRING); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(blobClient.getProperties().getBlobSize()); + entity.setCreateTime(Date.from(blobClient.getProperties().getCreationTime().toInstant())); + entity.setUpdateTime(Date.from(blobClient.getProperties().getLastModified().toInstant())); + + return entity; + } else { + throw new FileNotFoundException("Object is not found in ABS container: " + containerName); + } + } + } + + private String findDirAlias(String dirPath) { + if (!dirPath.endsWith(FOLDER_SEPARATOR)) { + return dirPath; + } + + Path path = Paths.get(dirPath); + return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR; + } + + public void checkContainerNameExists() { + if (StringUtils.isBlank(containerName)) { + throw new IllegalArgumentException(containerName + " is blank"); + } + + boolean exist = false; + for (BlobContainerItem item : blobServiceClient.listBlobContainers()) { + if (containerName.equals(item.getName())) { + exist = true; + break; + } + } + + if (!exist) { + throw new IllegalArgumentException( + "containerName: " + containerName + " is not exists, you need to create them by yourself"); + } else { + log.info("containerName: {} has been found", containerName); + } + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorFactory.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorFactory.java new file mode 100644 index 0000000000..1909ece6f1 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/main/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.abs; + +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory; +import org.apache.dolphinscheduler.plugin.storage.api.StorageType; + +import com.google.auto.service.AutoService; + +@AutoService(StorageOperateFactory.class) +public class AbsStorageOperatorFactory implements StorageOperateFactory { + + @Override + public StorageOperate createStorageOperate() { + AbsStorageOperator absStorageOperator = new AbsStorageOperator(); + absStorageOperator.init(); + return absStorageOperator; + } + + @Override + public StorageType getStorageOperate() { + return StorageType.ABS; + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/test/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/test/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorTest.java new file mode 100644 index 0000000000..daec0c36b2 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/test/java/org/apache/dolphinscheduler/plugin/storage/abs/AbsStorageOperatorTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.abs; + +import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.specialized.BlockBlobClient; + +@ExtendWith(MockitoExtension.class) +public class AbsStorageOperatorTest { + + private static final String CONNECTION_STRING_MOCK = "CONNECTION_STRING_MOCK"; + + private static final String ACCOUNT_NAME_MOCK = "ACCOUNT_NAME_MOCK"; + + private static final String CONTAINER_NAME_MOCK = "CONTAINER_NAME_MOCK"; + + private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK"; + + private static final String DIR_MOCK = "DIR_MOCK"; + + private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK"; + + private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK"; + + private static final String FULL_NAME = "/tmp/dir1/"; + + private static final String DEFAULT_PATH = "/tmp/"; + + @Mock + private BlobContainerClient blobContainerClient; + + @Mock + private BlobServiceClient blobServiceClient; + + @Mock + private BlockBlobClient blockBlobClient; + + @Mock + private BlobClient blobClient; + + private AbsStorageOperator absStorageOperator; + + @BeforeEach + public void setUp() throws Exception { + absStorageOperator = Mockito.spy(AbsStorageOperator.class); + Mockito.doReturn(CONNECTION_STRING_MOCK).when(absStorageOperator).readConnectionString(); + Mockito.doReturn(CONTAINER_NAME_MOCK).when(absStorageOperator).readContainerName(); + Mockito.doReturn(ACCOUNT_NAME_MOCK).when(absStorageOperator).readAccountName(); + Mockito.doReturn(blobContainerClient).when(absStorageOperator).buildBlobContainerClient(); + Mockito.doReturn(blobServiceClient).when(absStorageOperator).buildBlobServiceClient(); + Mockito.doNothing().when(absStorageOperator).checkContainerNameExists(); + + absStorageOperator.init(); + } + + @Test + public void testInit() throws Exception { + verify(absStorageOperator, times(1)).buildBlobServiceClient(); + verify(absStorageOperator, times(1)).buildBlobContainerClient(); + Assertions.assertEquals(CONNECTION_STRING_MOCK, absStorageOperator.getConnectionString()); + Assertions.assertEquals(CONTAINER_NAME_MOCK, absStorageOperator.getContainerName()); + Assertions.assertEquals(ACCOUNT_NAME_MOCK, absStorageOperator.getStorageAccountName()); + } + + @Test + public void createTenantResAndUdfDir() throws Exception { + doReturn(DIR_MOCK).when(absStorageOperator).getAbsResDir(TENANT_CODE_MOCK); + doReturn(DIR_MOCK).when(absStorageOperator).getAbsUdfDir(TENANT_CODE_MOCK); + doReturn(true).when(absStorageOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK); + absStorageOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK); + verify(absStorageOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK); + } + + @Test + public void getResDir() { + final String expectedResourceDir = String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK); + final String dir = absStorageOperator.getResDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedResourceDir, dir); + } + + @Test + public void getUdfDir() { + final String expectedUdfDir = String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK); + final String dir = absStorageOperator.getUdfDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedUdfDir, dir); + } + + @Test + public void mkdirWhenDirExists() { + boolean isSuccess = false; + try { + final String key = DIR_MOCK + FOLDER_SEPARATOR; + Mockito.doReturn(true).when(absStorageOperator).isObjectExists(key); + isSuccess = absStorageOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK); + + } catch (IOException e) { + Assertions.fail("test failed due to unexpected IO exception"); + } + + Assertions.assertTrue(isSuccess); + } + + @Test + public void getResourceFullName() { + final String expectedResourceFileName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFileName = absStorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedResourceFileName, resourceFileName); + } + + @Test + public void getFileName() { + final String expectedFileName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String fileName = absStorageOperator.getFileName(ResourceType.FILE, TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedFileName, fileName); + } + + @Test + public void exists() { + boolean doesExist = false; + doReturn(true).when(absStorageOperator).isObjectExists(FILE_NAME_MOCK); + try { + doesExist = absStorageOperator.exists(FILE_NAME_MOCK); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(doesExist); + } + + @Test + public void delete() { + boolean isDeleted = false; + doReturn(true).when(absStorageOperator).isObjectExists(FILE_NAME_MOCK); + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); + try { + isDeleted = absStorageOperator.delete(FILE_NAME_MOCK, true); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(isDeleted); + verify(blobClient, times(1)).delete(); + } + + @Test + public void copy() { + boolean isSuccess = false; + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); + Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient(); + try { + isSuccess = absStorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(isSuccess); + } + + @Test + public void deleteTenant() { + doNothing().when(absStorageOperator).deleteTenantCode(anyString()); + try { + absStorageOperator.deleteTenant(TENANT_CODE_MOCK); + } catch (Exception e) { + Assertions.fail("unexpected exception caught in unit test"); + } + + verify(absStorageOperator, times(1)).deleteTenantCode(anyString()); + } + + @Test + public void getGcsResDir() { + final String expectedGcsResDir = String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK); + final String gcsResDir = absStorageOperator.getAbsResDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedGcsResDir, gcsResDir); + } + + @Test + public void getGcsUdfDir() { + final String expectedGcsUdfDir = String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK); + final String gcsUdfDir = absStorageOperator.getAbsUdfDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedGcsUdfDir, gcsUdfDir); + } + + @Test + public void getGcsTenantDir() { + final String expectedGcsTenantDir = String.format(FORMAT_S_S, DIR_MOCK, TENANT_CODE_MOCK); + doReturn(DIR_MOCK).when(absStorageOperator).getGcsDataBasePath(); + final String gcsTenantDir = absStorageOperator.getAbsTenantDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedGcsTenantDir, gcsTenantDir); + } + + @Test + public void deleteDir() { + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); + doReturn(true).when(absStorageOperator).isObjectExists(Mockito.any()); + absStorageOperator.deleteDirectory(DIR_MOCK); + verify(blobClient, times(1)).delete(); + } + + @Test + public void testGetFileStatus() throws Exception { + StorageEntity entity = + absStorageOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(FULL_NAME, entity.getFullName()); + Assertions.assertEquals("dir1/", entity.getFileName()); + } + + @Test + public void testListFilesStatus() throws Exception { + Mockito.doReturn(null).when(blobContainerClient).listBlobsByHierarchy(Mockito.any()); + List result = + absStorageOperator.listFilesStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + verify(blobContainerClient, times(1)).listBlobsByHierarchy(Mockito.any()); + } + + @Test + public void testListFilesStatusRecursively() throws Exception { + StorageEntity entity = new StorageEntity(); + entity.setFullName(FULL_NAME); + + doReturn(entity).when(absStorageOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, + ResourceType.FILE); + doReturn(Collections.EMPTY_LIST).when(absStorageOperator).listFilesStatus(anyString(), anyString(), anyString(), + Mockito.any(ResourceType.class)); + + List result = + absStorageOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, + ResourceType.FILE); + Assertions.assertEquals(0, result.size()); + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml index b5ee535c89..a85fcd3c4e 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml @@ -52,6 +52,11 @@ dolphinscheduler-storage-gcs ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-storage-abs + ${project.version} + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java index 5a7da2ea98..33c34f4c53 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java @@ -25,7 +25,9 @@ public enum StorageType { HDFS(1, "HDFS"), OSS(2, "OSS"), S3(3, "S3"), - GCS(4, "GCS"); + GCS(4, "GCS"), + + ABS(5, "ABS"); private final int code; private final String name; diff --git a/dolphinscheduler-storage-plugin/pom.xml b/dolphinscheduler-storage-plugin/pom.xml index a8780f8580..7e27358a64 100644 --- a/dolphinscheduler-storage-plugin/pom.xml +++ b/dolphinscheduler-storage-plugin/pom.xml @@ -34,6 +34,7 @@ dolphinscheduler-storage-hdfs dolphinscheduler-storage-oss dolphinscheduler-storage-gcs + dolphinscheduler-storage-abs diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/common.properties b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/common.properties index 9cd68aeb55..5fab54a143 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/common.properties +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: HDFS, S3, OSS, GCS, NONE +# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE resource.storage.type=NONE # resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/dolphinscheduler diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 40714ce7d2..11d061d2c8 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -371,8 +371,8 @@ jdom2-2.0.6.1.jar jettison-1.1.jar stax-api-1.0.1.jar trino-jdbc-402.jar -azure-core-1.34.0.jar -azure-core-http-netty-1.12.7.jar +azure-core-1.36.0.jar +azure-core-http-netty-1.13.0.jar azure-identity-1.7.1.jar content-type-2.2.jar jackson-dataformat-xml-2.13.3.jar @@ -423,13 +423,13 @@ azure-resourcemanager-servicebus-2.21.0.jar azure-resourcemanager-sql-2.21.0.jar azure-resourcemanager-storage-2.21.0.jar azure-resourcemanager-trafficmanager-2.21.0.jar -azure-security-keyvault-keys-4.5.2.jar -azure-security-keyvault-secrets-4.5.2.jar -azure-storage-common-12.19.1.jar -azure-storage-file-share-12.16.1.jar +azure-security-keyvault-keys-4.5.4.jar +azure-security-keyvault-secrets-4.5.4.jar +azure-storage-common-12.20.0.jar +azure-storage-file-share-12.17.0.jar nimbus-jose-jwt-9.22.jar woodstox-core-6.4.0.jar -azure-core-management-1.9.0.jar +azure-core-management-1.10.1.jar api-common-2.6.0.jar auto-value-1.10.1.jar auto-value-annotations-1.10.1.jar @@ -480,4 +480,6 @@ casdoor-spring-boot-starter-1.6.0.jar org.apache.oltu.oauth2.client-1.0.2.jar org.apache.oltu.oauth2.common-1.0.2.jar snowflake-jdbc-3.13.10.jar +azure-storage-blob-12.21.0.jar +azure-storage-internal-avro-12.6.0.jar vertica-jdbc-12.0.4-0.jar