Browse Source

[Feature][Resource Center] Add support for Huawei Cloud OBS (#14643)

* [Feature][Resource Center] Add support for Huawei Cloud OBS as storage of resource center

* add license and add doc

* add 3-party dependency license

* Update LICENSE

* fix

* Update pom.xml

* fix

* fix

---------

Co-authored-by: sunkang <sunkang@hjfruit.com>
Co-authored-by: xiangzihao <460888207@qq.com>
Co-authored-by: Rick Cheng <rickchengx@gmail.com>
3.2.1-prepare
Edison Catto 1 year ago committed by GitHub
parent
commit
25480ae9e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      docs/docs/en/guide/resource/configuration.md
  2. 13
      docs/docs/zh/guide/resource/configuration.md
  3. 18
      dolphinscheduler-bom/pom.xml
  4. 5
      dolphinscheduler-common/pom.xml
  5. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
  7. 13
      dolphinscheduler-common/src/main/resources/common.properties
  8. 1
      dolphinscheduler-dist/release-docs/LICENSE
  9. 202
      dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt
  10. 5
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
  11. 4
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
  12. 46
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml
  13. 513
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java
  14. 43
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java
  15. 291
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java
  16. 1
      dolphinscheduler-storage-plugin/pom.xml
  17. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  18. 1
      tools/dependencies/known-dependencies.txt

13
docs/docs/en/guide/resource/configuration.md

@ -1,7 +1,7 @@
# Resource Center Configuration # Resource Center Configuration
- You could use `Resource Center` to upload text files, UDFs and other task-related files. - You could use `Resource Center` to upload text files, UDFs and other task-related files.
- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), etc. - You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html) etc.
- You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resouce Center` without the need of an external `HDFS` system or `S3`. - You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resouce Center` without the need of an external `HDFS` system or `S3`.
- Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resouce Center`. In this way, you could operate remote files as if on your local machines. - Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resouce Center`. In this way, you could operate remote files as if on your local machines.
@ -80,7 +80,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs # resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE # resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
resource.storage.type=NONE resource.storage.type=NONE
# resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended # resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler resource.storage.upload.base.path=/tmp/dolphinscheduler
@ -107,6 +107,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS # oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
# alibaba cloud access key id, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.secret=<your-access-key-secret>
# oss bucket name, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir

13
docs/docs/zh/guide/resource/configuration.md

@ -1,7 +1,7 @@
# 资源中心配置详情 # 资源中心配置详情
- 资源中心通常用于上传文件、UDF 函数,以及任务组管理等操作。 - 资源中心通常用于上传文件、UDF 函数,以及任务组管理等操作。
- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss)等。 - 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html) 等。
- 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。 - 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。
- 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。 - 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。
@ -79,7 +79,7 @@ resource.aws.s3.endpoint=
# user data local directory path, please make sure the directory exists and have read write permissions # user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler data.basedir.path=/tmp/dolphinscheduler
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS # resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
resource.storage.type=LOCAL resource.storage.type=LOCAL
# resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration, # resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration,
@ -108,6 +108,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS # oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
# alibaba cloud access key id, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.secret=<your-access-key-secret>
# oss bucket name, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=root resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; # if resource.storage.type=S3, the value like: s3a://dolphinscheduler;

18
dolphinscheduler-bom/pom.xml

@ -115,6 +115,7 @@
<casdoor.version>1.6.0</casdoor.version> <casdoor.version>1.6.0</casdoor.version>
<azure-sdk-bom.version>1.2.10</azure-sdk-bom.version> <azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
<protobuf.version>3.17.2</protobuf.version> <protobuf.version>3.17.2</protobuf.version>
<esdk-obs.version>3.23.3</esdk-obs.version>
<system-lambda.version>1.2.1</system-lambda.version> <system-lambda.version>1.2.1</system-lambda.version>
</properties> </properties>
@ -896,6 +897,23 @@
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version> <version>${protobuf.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.huaweicloud</groupId>
<artifactId>esdk-obs-java-bundle</artifactId>
<version>${esdk-obs.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.github.stefanbirkner</groupId> <groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId> <artifactId>system-lambda</artifactId>

5
dolphinscheduler-common/pom.xml

@ -93,6 +93,11 @@
<artifactId>aws-java-sdk-s3</artifactId> <artifactId>aws-java-sdk-s3</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.huaweicloud</groupId>
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.github.oshi</groupId> <groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId> <artifactId>oshi-core</artifactId>

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -147,6 +147,9 @@ public final class Constants {
public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME = "resource.azure.blob.storage.account.name"; public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME = "resource.azure.blob.storage.account.name";
public static final String HUAWEI_CLOUD_OBS_BUCKET_NAME = "resource.huawei.cloud.obs.bucket.name";
public static final String HUAWEI_CLOUD_OBS_END_POINT = "resource.huawei.cloud.obs.endpoint";
/** /**
* fetch applicationId way * fetch applicationId way
*/ */

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java

@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums;
* data base types * data base types
*/ */
public enum ResUploadType { public enum ResUploadType {
LOCAL, HDFS, S3, OSS, GCS, ABS, NONE LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
} }

13
dolphinscheduler-common/src/main/resources/common.properties

@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs # resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point # use shared file mount point
resource.storage.type=LOCAL resource.storage.type=LOCAL
@ -73,6 +73,17 @@ resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS # abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string> resource.azure.blob.storage.connection.string=<your-connection-string>
# huawei cloud access key id, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.id=<your-access-key-id>
# huawei cloud access key secret, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.secret=<your-access-key-secret>
# oss bucket name, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir

1
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -567,6 +567,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
casdoor-spring-boot-starter 1.6.0 https://mvnrepository.com/artifact/org.casbin/casdoor-spring-boot-starter/1.6.0 Apache 2.0 casdoor-spring-boot-starter 1.6.0 https://mvnrepository.com/artifact/org.casbin/casdoor-spring-boot-starter/1.6.0 Apache 2.0
org.apache.oltu.oauth2.client 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.client/1.0.2 Apache 2.0 org.apache.oltu.oauth2.client 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.client/1.0.2 Apache 2.0
org.apache.oltu.oauth2.common 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.common/1.0.2 Apache 2.0 org.apache.oltu.oauth2.common 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.common/1.0.2 Apache 2.0
esdk-obs-java-bundle 3.23.3 https://mvnrepository.com/artifact/com.huaweicloud/esdk-obs-java-bundle/3.23.3 Apache 2.0

202
dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt vendored

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

5
dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml

@ -57,6 +57,11 @@
<artifactId>dolphinscheduler-storage-abs</artifactId> <artifactId>dolphinscheduler-storage-abs</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-storage-obs</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

4
dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java

@ -27,7 +27,9 @@ public enum StorageType {
S3(3, "S3"), S3(3, "S3"),
GCS(4, "GCS"), GCS(4, "GCS"),
ABS(5, "ABS"); ABS(5, "ABS"),
OBS(6, "OBS");
private final int code; private final int code;
private final String name; private final String name;

46
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-storage-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-storage-obs</artifactId>
<dependencies>
<dependency>
<groupId>com.huaweicloud</groupId>
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-storage-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

513
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java

@ -0,0 +1,513 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.storage.obs;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.internal.ServiceException;
import com.obs.services.model.DeleteObjectsRequest;
import com.obs.services.model.GetObjectRequest;
import com.obs.services.model.ListObjectsRequest;
import com.obs.services.model.ObjectListing;
import com.obs.services.model.ObjectMetadata;
import com.obs.services.model.ObsObject;
import com.obs.services.model.PutObjectRequest;
@Data
@Slf4j
public class ObsStorageOperator implements Closeable, StorageOperate {
private String accessKeyId;
private String accessKeySecret;
private String bucketName;
private String endPoint;
private ObsClient obsClient;
public ObsStorageOperator() {
}
public void init() {
this.accessKeyId = readObsAccessKeyID();
this.accessKeySecret = readObsAccessKeySecret();
this.endPoint = readObsEndPoint();
this.bucketName = readObsBucketName();
this.obsClient = buildObsClient();
ensureBucketSuccessfullyCreated(bucketName);
}
protected String readObsAccessKeyID() {
return PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_ID);
}
protected String readObsAccessKeySecret() {
return PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_SECRET);
}
protected String readObsBucketName() {
return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_BUCKET_NAME);
}
protected String readObsEndPoint() {
return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_END_POINT);
}
@Override
public void close() throws IOException {
obsClient.close();
}
@Override
public void createTenantDirIfNotExists(String tenantCode) throws Exception {
mkdir(tenantCode, getObsResDir(tenantCode));
mkdir(tenantCode, getObsUdfDir(tenantCode));
}
@Override
public String getResDir(String tenantCode) {
return getObsResDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public String getUdfDir(String tenantCode) {
return getObsUdfDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
final String key = path + FOLDER_SEPARATOR;
if (!obsClient.doesObjectExist(bucketName, key)) {
createObsPrefix(bucketName, key);
}
return true;
}
protected void createObsPrefix(final String bucketName, final String key) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0L);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, emptyContent);
obsClient.putObject(putObjectRequest);
}
@Override
public String getResourceFullName(String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return String.format(FORMAT_S_S, getObsResDir(tenantCode), fileName);
}
@Override
public String getResourceFileName(String tenantCode, String fullName) {
String resDir = getResDir(tenantCode);
return fullName.replaceFirst(resDir, "");
}
@Override
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return getDir(resourceType, tenantCode) + fileName;
}
@Override
public boolean delete(String fullName, List<String> childrenPathList, boolean recursive) throws IOException {
// append the resource fullName to the list for deletion.
childrenPathList.add(fullName);
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName);
for (String deleteKys : childrenPathList) {
deleteObjectsRequest.addKeyAndVersion(deleteKys);
}
try {
obsClient.deleteObjects(deleteObjectsRequest);
} catch (Exception e) {
log.error("delete objects error", e);
return false;
}
return true;
}
@Override
public void download(String tenantCode, String srcFilePath, String dstFilePath,
boolean overwrite) throws IOException {
File dstFile = new File(dstFilePath);
if (dstFile.isDirectory()) {
Files.delete(dstFile.toPath());
} else {
Files.createDirectories(dstFile.getParentFile().toPath());
}
ObsObject obsObject = obsClient.getObject(bucketName, srcFilePath);
try (
InputStream obsInputStream = obsObject.getObjectContent();
FileOutputStream fos = new FileOutputStream(dstFilePath)) {
byte[] readBuf = new byte[1024];
int readLen;
while ((readLen = obsInputStream.read(readBuf)) > 0) {
fos.write(readBuf, 0, readLen);
}
} catch (ObsException e) {
throw new IOException(e);
} catch (FileNotFoundException e) {
log.error("cannot find the destination file {}", dstFilePath);
throw e;
}
}
@Override
public boolean exists(String fileName) throws IOException {
return obsClient.doesObjectExist(bucketName, fileName);
}
@Override
public boolean delete(String filePath, boolean recursive) throws IOException {
try {
obsClient.deleteObject(bucketName, filePath);
return true;
} catch (ObsException e) {
log.error("fail to delete the object, the resource path is {}", filePath, e);
return false;
}
}
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
obsClient.copyObject(bucketName, srcPath, bucketName, dstPath);
if (deleteSource) {
obsClient.deleteObject(bucketName, srcPath);
}
return true;
}
@Override
public String getDir(ResourceType resourceType, String tenantCode) {
switch (resourceType) {
case UDF:
return getUdfDir(tenantCode);
case FILE:
return getResDir(tenantCode);
case ALL:
return getObsDataBasePath();
default:
return "";
}
}
@Override
public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource,
boolean overwrite) throws IOException {
try {
obsClient.putObject(bucketName, dstPath, new File(srcFile));
if (deleteSource) {
Files.delete(Paths.get(srcFile));
}
return true;
} catch (ObsException e) {
log.error("upload failed, the bucketName is {}, the filePath is {}", bucketName, dstPath, e);
return false;
}
}
@Override
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException {
if (StringUtils.isBlank(filePath)) {
log.error("file path:{} is empty", filePath);
return Collections.emptyList();
}
ObsObject obsObject = obsClient.getObject(bucketName, filePath);
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(obsObject.getObjectContent()))) {
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
@Override
public ResUploadType returnStorageType() {
return ResUploadType.OBS;
}
@Override
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
ResourceType type) {
List<StorageEntity> storageEntityList = new ArrayList<>();
LinkedList<StorageEntity> foldersToFetch = new LinkedList<>();
StorageEntity initialEntity = null;
try {
initialEntity = getFileStatus(path, defaultPath, tenantCode, type);
} catch (Exception e) {
log.error("error while listing files status recursively, path: {}", path, e);
return storageEntityList;
}
foldersToFetch.add(initialEntity);
while (!foldersToFetch.isEmpty()) {
String pathToExplore = foldersToFetch.pop().getFullName();
try {
List<StorageEntity> tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type);
for (StorageEntity temp : tempList) {
if (temp.isDirectory()) {
foldersToFetch.add(temp);
}
}
storageEntityList.addAll(tempList);
} catch (Exception e) {
log.error("error while listing files stat:wus recursively, path: {}", pathToExplore, e);
}
}
return storageEntityList;
}
@Override
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
List<StorageEntity> storageEntityList = new ArrayList<>();
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucketName);
request.setPrefix(path);
request.setDelimiter(FOLDER_SEPARATOR);
ObjectListing result = null;
try {
result = obsClient.listObjects(request);
} catch (Exception e) {
throw new ServiceException("Get ObsClient file list exception", e);
}
while (result != null) {
String nextMarker = result.getNextMarker();
List<ObsObject> objects = result.getObjects();
for (ObsObject object : objects) {
if (!object.getObjectKey().endsWith(FOLDER_SEPARATOR)) {
// the path is a file
String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR);
String alias = aliasArr[aliasArr.length - 1];
String fileName = StringUtils.difference(defaultPath, object.getObjectKey());
StorageEntity entity = new StorageEntity();
ObjectMetadata metadata = object.getMetadata();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(object.getObjectKey());
entity.setDirectory(false);
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(metadata.getContentLength());
entity.setCreateTime(metadata.getLastModified());
entity.setUpdateTime(metadata.getLastModified());
entity.setPfullName(path);
storageEntityList.add(entity);
}
}
for (String commonPrefix : result.getCommonPrefixes()) {
// the paths in commonPrefix are directories
String suffix = StringUtils.difference(path, commonPrefix);
String fileName = StringUtils.difference(defaultPath, commonPrefix);
StorageEntity entity = new StorageEntity();
entity.setAlias(suffix);
entity.setFileName(fileName);
entity.setFullName(commonPrefix);
entity.setDirectory(true);
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(0);
entity.setCreateTime(null);
entity.setUpdateTime(null);
entity.setPfullName(path);
storageEntityList.add(entity);
}
if (!StringUtils.isNotBlank(nextMarker)) {
break;
}
request.setMarker(nextMarker);
try {
result = obsClient.listObjects(request);
} catch (Exception e) {
throw new ServiceException("Get ObsClient file list exception", e);
}
}
return storageEntityList;
}
@Override
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
if (path.endsWith(FOLDER_SEPARATOR)) {
// the path is a directory that may or may not exist in ObsClient
String alias = findDirAlias(path);
String fileName = StringUtils.difference(defaultPath, path);
StorageEntity entity = new StorageEntity();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(path);
entity.setDirectory(true);
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(0);
return entity;
} else {
GetObjectRequest request = new GetObjectRequest();
request.setBucketName(bucketName);
request.setObjectKey(path);
ObsObject object;
try {
object = obsClient.getObject(request);
} catch (Exception e) {
throw new ServiceException("Get ObsClient file list exception", e);
}
String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR);
String alias = aliasArr[aliasArr.length - 1];
String fileName = StringUtils.difference(defaultPath, object.getObjectKey());
StorageEntity entity = new StorageEntity();
ObjectMetadata metadata = object.getMetadata();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(object.getObjectKey());
entity.setDirectory(false);
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(metadata.getContentLength());
entity.setCreateTime(metadata.getLastModified());
entity.setUpdateTime(metadata.getLastModified());
return entity;
}
}
@Override
public void deleteTenant(String tenantCode) throws Exception {
deleteTenantCode(tenantCode);
}
public String getObsResDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_FILE, getObsTenantDir(tenantCode));
}
public String getObsUdfDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_UDF, getObsTenantDir(tenantCode));
}
public String getObsTenantDir(String tenantCode) {
return String.format(FORMAT_S_S, getObsDataBasePath(), tenantCode);
}
public String getObsDataBasePath() {
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
return "";
} else {
return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, "");
}
}
protected void deleteTenantCode(String tenantCode) {
deleteDir(getResDir(tenantCode));
deleteDir(getUdfDir(tenantCode));
}
public void ensureBucketSuccessfullyCreated(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException("resource.alibaba.cloud.obs.bucket.name is empty");
}
boolean existsBucket = obsClient.headBucket(bucketName);
if (!existsBucket) {
throw new IllegalArgumentException(
"bucketName: " + bucketName + " is not exists, you need to create them by yourself");
}
log.info("bucketName: {} has been found", bucketName);
}
protected void deleteDir(String directoryName) {
if (obsClient.doesObjectExist(bucketName, directoryName)) {
obsClient.deleteObject(bucketName, directoryName);
}
}
protected ObsClient buildObsClient() {
return new ObsClient(accessKeyId, accessKeySecret, endPoint);
}
private String findDirAlias(String dirPath) {
if (!dirPath.endsWith(FOLDER_SEPARATOR)) {
return dirPath;
}
Path path = Paths.get(dirPath);
return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR;
}
}

43
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.storage.obs;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
import com.google.auto.service.AutoService;
@AutoService({StorageOperateFactory.class})
public class ObsStorageOperatorFactory implements StorageOperateFactory {
public ObsStorageOperatorFactory() {
}
@Override
public StorageOperate createStorageOperate() {
ObsStorageOperator ossOperator = new ObsStorageOperator();
ossOperator.init();
return ossOperator;
}
@Override
public StorageType getStorageOperate() {
return StorageType.OBS;
}
}

291
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.storage.obs;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.obs.services.ObsClient;
@ExtendWith(MockitoExtension.class)
public class ObsStorageOperatorTest {
private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK";
private static final String ACCESS_KEY_SECRET_MOCK = "ACCESS_KEY_SECRET_MOCK";
private static final String END_POINT_MOCK = "END_POINT_MOCK";
private static final String BUCKET_NAME_MOCK = "BUCKET_NAME_MOCK";
private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK";
private static final String DIR_MOCK = "DIR_MOCK";
private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK";
private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK";
private static final String FULL_NAME = "/tmp/dir1/";
private static final String DEFAULT_PATH = "/tmp/";
@Mock
private ObsClient obsClientMock;
private ObsStorageOperator obsOperator;
@BeforeEach
public void setUp() throws Exception {
obsOperator = spy(new ObsStorageOperator());
doReturn(ACCESS_KEY_ID_MOCK).when(obsOperator)
.readObsAccessKeyID();
doReturn(ACCESS_KEY_SECRET_MOCK).when(obsOperator)
.readObsAccessKeySecret();
doReturn(BUCKET_NAME_MOCK).when(obsOperator).readObsBucketName();
doReturn(END_POINT_MOCK).when(obsOperator).readObsEndPoint();
doReturn(obsClientMock).when(obsOperator).buildObsClient();
doNothing().when(obsOperator).ensureBucketSuccessfullyCreated(any());
obsOperator.init();
}
@Test
public void initObsOperator() {
verify(obsOperator, times(1)).buildObsClient();
Assertions.assertEquals(ACCESS_KEY_ID_MOCK, obsOperator.getAccessKeyId());
Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK, obsOperator.getAccessKeySecret());
Assertions.assertEquals(BUCKET_NAME_MOCK, obsOperator.getBucketName());
}
@Test
public void tearDownObsOperator() throws IOException {
doNothing().when(obsClientMock).close();
obsOperator.close();
verify(obsClientMock, times(1)).close();
}
@Test
public void createTenantResAndUdfDir() throws Exception {
doReturn(DIR_MOCK).when(obsOperator).getObsResDir(TENANT_CODE_MOCK);
doReturn(DIR_MOCK).when(obsOperator).getObsUdfDir(TENANT_CODE_MOCK);
doReturn(true).when(obsOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
obsOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK);
verify(obsOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
}
@Test
public void getResDir() {
final String expectedResourceDir = String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK);
final String dir = obsOperator.getResDir(TENANT_CODE_MOCK);
Assertions.assertEquals(expectedResourceDir, dir);
}
@Test
public void getUdfDir() {
final String expectedUdfDir = String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK);
final String dir = obsOperator.getUdfDir(TENANT_CODE_MOCK);
Assertions.assertEquals(expectedUdfDir, dir);
}
@Test
public void mkdirWhenDirExists() {
boolean isSuccess = false;
try {
final String key = DIR_MOCK + FOLDER_SEPARATOR;
doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key);
} catch (IOException e) {
Assertions.fail("test failed due to unexpected IO exception");
}
Assertions.assertTrue(isSuccess);
}
@Test
public void mkdirWhenDirNotExists() {
boolean isSuccess = true;
try {
final String key = DIR_MOCK + FOLDER_SEPARATOR;
doReturn(false).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key);
doNothing().when(obsOperator).createObsPrefix(BUCKET_NAME_MOCK, key);
isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key);
verify(obsOperator, times(1)).createObsPrefix(BUCKET_NAME_MOCK, key);
} catch (IOException e) {
Assertions.fail("test failed due to unexpected IO exception");
}
Assertions.assertTrue(isSuccess);
}
@Test
public void getResourceFullName() {
final String expectedResourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFullName = obsOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
Assertions.assertEquals(expectedResourceFullName, resourceFullName);
}
@Test
public void getResourceFileName() {
final String expectedResourceFileName = FILE_NAME_MOCK;
final String resourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFileName = obsOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
Assertions.assertEquals(expectedResourceFileName, resourceFileName);
}
@Test
public void getFileName() {
final String expectedFileName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String fileName = obsOperator.getFileName(ResourceType.FILE, TENANT_CODE_MOCK, FILE_NAME_MOCK);
Assertions.assertEquals(expectedFileName, fileName);
}
@Test
public void exists() {
boolean doesExist = false;
doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, FILE_NAME_MOCK);
try {
doesExist = obsOperator.exists(FILE_NAME_MOCK);
} catch (IOException e) {
Assertions.fail("unexpected IO exception in unit test");
}
Assertions.assertTrue(doesExist);
verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, FILE_NAME_MOCK);
}
@Test
public void delete() {
boolean isDeleted = false;
doReturn(null).when(obsClientMock).deleteObject(anyString(), anyString());
try {
isDeleted = obsOperator.delete(FILE_NAME_MOCK, true);
} catch (IOException e) {
Assertions.fail("unexpected IO exception in unit test");
}
Assertions.assertTrue(isDeleted);
verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
}
@Test
public void copy() {
boolean isSuccess = false;
doReturn(null).when(obsClientMock).copyObject(anyString(), anyString(), anyString(), anyString());
try {
isSuccess = obsOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false);
} catch (IOException e) {
Assertions.fail("unexpected IO exception in unit test");
}
Assertions.assertTrue(isSuccess);
verify(obsClientMock, times(1)).copyObject(anyString(), anyString(), anyString(), anyString());
}
@Test
public void deleteTenant() {
doNothing().when(obsOperator).deleteTenantCode(anyString());
try {
obsOperator.deleteTenant(TENANT_CODE_MOCK);
} catch (Exception e) {
Assertions.fail("unexpected exception caught in unit test");
}
verify(obsOperator, times(1)).deleteTenantCode(anyString());
}
@Test
public void getObsResDir() {
final String expectedObsResDir = String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK);
final String obsResDir = obsOperator.getObsResDir(TENANT_CODE_MOCK);
Assertions.assertEquals(expectedObsResDir, obsResDir);
}
@Test
public void getObsUdfDir() {
final String expectedObsUdfDir = String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK);
final String obsUdfDir = obsOperator.getObsUdfDir(TENANT_CODE_MOCK);
Assertions.assertEquals(expectedObsUdfDir, obsUdfDir);
}
@Test
public void getObsTenantDir() {
final String expectedObsTenantDir = String.format(FORMAT_S_S, DIR_MOCK, TENANT_CODE_MOCK);
doReturn(DIR_MOCK).when(obsOperator).getObsDataBasePath();
final String obsTenantDir = obsOperator.getObsTenantDir(TENANT_CODE_MOCK);
Assertions.assertEquals(expectedObsTenantDir, obsTenantDir);
}
@Test
public void deleteDir() {
doReturn(true).when(obsClientMock).doesObjectExist(anyString(), anyString());
obsOperator.deleteDir(DIR_MOCK);
verify(obsClientMock, times(1)).deleteObject(anyString(), anyString());
}
@Test
public void testGetFileStatus() throws Exception {
StorageEntity entity = obsOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
Assertions.assertEquals(FULL_NAME, entity.getFullName());
Assertions.assertEquals("dir1/", entity.getFileName());
}
@Test
public void testListFilesStatus() throws Exception {
List<StorageEntity> result =
obsOperator.listFilesStatus("dolphinscheduler/default/resources/",
"dolphinscheduler/default/resources/",
"default", ResourceType.FILE);
Assertions.assertEquals(0, result.size());
}
@Test
public void testListFilesStatusRecursively() throws Exception {
StorageEntity entity = new StorageEntity();
entity.setFullName(FULL_NAME);
doReturn(entity).when(obsOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
doReturn(Collections.EMPTY_LIST).when(obsOperator).listFilesStatus(anyString(), anyString(), anyString(),
Mockito.any(ResourceType.class));
List<StorageEntity> result =
obsOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
Assertions.assertEquals(0, result.size());
}
}

1
dolphinscheduler-storage-plugin/pom.xml

@ -35,6 +35,7 @@
<module>dolphinscheduler-storage-oss</module> <module>dolphinscheduler-storage-oss</module>
<module>dolphinscheduler-storage-gcs</module> <module>dolphinscheduler-storage-gcs</module>
<module>dolphinscheduler-storage-abs</module> <module>dolphinscheduler-storage-abs</module>
<module>dolphinscheduler-storage-obs</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>

6
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -475,6 +475,12 @@ public class TaskConstants {
public static final String ALIBABA_CLOUD_ACCESS_KEY_SECRET = "resource.alibaba.cloud.access.key.secret"; public static final String ALIBABA_CLOUD_ACCESS_KEY_SECRET = "resource.alibaba.cloud.access.key.secret";
public static final String ALIBABA_CLOUD_REGION = "resource.alibaba.cloud.region"; public static final String ALIBABA_CLOUD_REGION = "resource.alibaba.cloud.region";
/**
* huawei cloud config
*/
public static final String HUAWEI_CLOUD_ACCESS_KEY_ID = "resource.huawei.cloud.access.key.id";
public static final String HUAWEI_CLOUD_ACCESS_KEY_SECRET = "resource.huawei.cloud.access.key.secret";
/** /**
* use for k8s task * use for k8s task
*/ */

1
tools/dependencies/known-dependencies.txt

@ -495,3 +495,4 @@ snowflake-jdbc-3.13.29.jar
azure-storage-blob-12.21.0.jar azure-storage-blob-12.21.0.jar
azure-storage-internal-avro-12.6.0.jar azure-storage-internal-avro-12.6.0.jar
vertica-jdbc-12.0.4-0.jar vertica-jdbc-12.0.4-0.jar
esdk-obs-java-bundle-3.23.3.jar
Loading…
Cancel
Save