diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index d8e13770b2..a4bb8cef42 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -1,7 +1,7 @@ # Resource Center Configuration - You could use `Resource Center` to upload text files, UDFs and other task-related files. -- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), etc. +- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html) etc. - You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resouce Center` without the need of an external `HDFS` system or `S3`. - Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resouce Center`. In this way, you could operate remote files as if on your local machines. @@ -80,7 +80,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: HDFS, S3, OSS, GCS, ABS, NONE +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE resource.storage.type=NONE # resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/tmp/dolphinscheduler @@ -107,6 +107,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler # oss bucket endpoint, required if you set resource.storage.type=OSS resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com +# alibaba cloud access key id, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.id= +# alibaba cloud access key secret, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.secret= +# oss bucket name, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.bucket.name=dolphinscheduler +# oss bucket endpoint, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir diff --git a/docs/docs/zh/guide/resource/configuration.md b/docs/docs/zh/guide/resource/configuration.md index 43ee13966e..25bacf90a0 100644 --- a/docs/docs/zh/guide/resource/configuration.md +++ b/docs/docs/zh/guide/resource/configuration.md @@ -1,7 +1,7 @@ # 资源中心配置详情 - 资源中心通常用于上传文件、UDF 函数,以及任务组管理等操作。 -- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss)等。 +- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html) 等。 - 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。 - 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。 @@ -79,7 +79,7 @@ resource.aws.s3.endpoint= # user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path=/tmp/dolphinscheduler -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE resource.storage.type=LOCAL # resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration, @@ -108,6 +108,15 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler # oss bucket endpoint, required if you set resource.storage.type=OSS resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com +# alibaba cloud access key id, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.id= +# alibaba cloud access key secret, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.secret= +# oss bucket name, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.bucket.name=dolphinscheduler +# oss bucket endpoint, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=root # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index 7af68b6701..8bb9481fc8 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -115,6 +115,7 @@ 1.6.0 1.2.10 3.17.2 + 3.23.3 1.2.1 @@ -896,6 +897,23 @@ protobuf-java ${protobuf.version} + + + com.huaweicloud + esdk-obs-java-bundle + ${esdk-obs.version} + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-api + + + + com.github.stefanbirkner system-lambda diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 096306fa8c..0254e6c43a 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -93,6 +93,11 @@ aws-java-sdk-s3 + + com.huaweicloud + esdk-obs-java-bundle + + com.github.oshi oshi-core diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index f2260c30cf..e98f5f202f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -147,6 +147,9 @@ public final class Constants { public static final String AZURE_BLOB_STORAGE_ACCOUNT_NAME = "resource.azure.blob.storage.account.name"; + public static final String HUAWEI_CLOUD_OBS_BUCKET_NAME = "resource.huawei.cloud.obs.bucket.name"; + public static final String HUAWEI_CLOUD_OBS_END_POINT = "resource.huawei.cloud.obs.endpoint"; + /** * fetch applicationId way */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java index 2ebc2e1a5c..143cb12a35 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java @@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums; * data base types */ public enum ResUploadType { - LOCAL, HDFS, S3, OSS, GCS, ABS, NONE + LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 43a1338152..ea4c126beb 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL @@ -73,6 +73,17 @@ resource.azure.blob.storage.account.name= # abs connection string, required if you set resource.storage.type=ABS resource.azure.blob.storage.connection.string= + +# huawei cloud access key id, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.id= +# huawei cloud access key secret, required if you set resource.storage.type=OBS +resource.huawei.cloud.access.key.secret= +# oss bucket name, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.bucket.name=dolphinscheduler +# oss bucket endpoint, required if you set resource.storage.type=OBS +resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com + + # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index ffc1a39ec6..ab26aeda4a 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -567,6 +567,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. casdoor-spring-boot-starter 1.6.0 https://mvnrepository.com/artifact/org.casbin/casdoor-spring-boot-starter/1.6.0 Apache 2.0 org.apache.oltu.oauth2.client 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.client/1.0.2 Apache 2.0 org.apache.oltu.oauth2.common 1.0.2 https://mvnrepository.com/artifact/org.apache.oltu.oauth2/org.apache.oltu.oauth2.common/1.0.2 Apache 2.0 + esdk-obs-java-bundle 3.23.3 https://mvnrepository.com/artifact/com.huaweicloud/esdk-obs-java-bundle/3.23.3 Apache 2.0 diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-esdk-obs-java-bundle.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml index a85fcd3c4e..4d6edcd1cc 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml @@ -57,6 +57,11 @@ dolphinscheduler-storage-abs ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-storage-obs + ${project.version} + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java index 33c34f4c53..7ead2c8a94 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java @@ -27,7 +27,9 @@ public enum StorageType { S3(3, "S3"), GCS(4, "GCS"), - ABS(5, "ABS"); + ABS(5, "ABS"), + + OBS(6, "OBS"); private final int code; private final String name; diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml new file mode 100644 index 0000000000..d19923498b --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-storage-plugin + dev-SNAPSHOT + + + dolphinscheduler-storage-obs + + + + com.huaweicloud + esdk-obs-java-bundle + + + org.apache.dolphinscheduler + dolphinscheduler-storage-api + ${project.version} + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java new file mode 100644 index 0000000000..6dc5318519 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperator.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.obs; + +import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; +import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE; +import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import com.obs.services.ObsClient; +import com.obs.services.exception.ObsException; +import com.obs.services.internal.ServiceException; +import com.obs.services.model.DeleteObjectsRequest; +import com.obs.services.model.GetObjectRequest; +import com.obs.services.model.ListObjectsRequest; +import com.obs.services.model.ObjectListing; +import com.obs.services.model.ObjectMetadata; +import com.obs.services.model.ObsObject; +import com.obs.services.model.PutObjectRequest; + +@Data +@Slf4j +public class ObsStorageOperator implements Closeable, StorageOperate { + + private String accessKeyId; + + private String accessKeySecret; + + private String bucketName; + + private String endPoint; + + private ObsClient obsClient; + + public ObsStorageOperator() { + } + + public void init() { + this.accessKeyId = readObsAccessKeyID(); + this.accessKeySecret = readObsAccessKeySecret(); + this.endPoint = readObsEndPoint(); + this.bucketName = readObsBucketName(); + this.obsClient = buildObsClient(); + ensureBucketSuccessfullyCreated(bucketName); + } + + protected String readObsAccessKeyID() { + return PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_ID); + } + + protected String readObsAccessKeySecret() { + return PropertyUtils.getString(TaskConstants.HUAWEI_CLOUD_ACCESS_KEY_SECRET); + } + + protected String readObsBucketName() { + return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_BUCKET_NAME); + } + + protected String readObsEndPoint() { + return PropertyUtils.getString(Constants.HUAWEI_CLOUD_OBS_END_POINT); + } + + @Override + public void close() throws IOException { + obsClient.close(); + } + + @Override + public void createTenantDirIfNotExists(String tenantCode) throws Exception { + mkdir(tenantCode, getObsResDir(tenantCode)); + mkdir(tenantCode, getObsUdfDir(tenantCode)); + } + + @Override + public String getResDir(String tenantCode) { + return getObsResDir(tenantCode) + FOLDER_SEPARATOR; + } + + @Override + public String getUdfDir(String tenantCode) { + return getObsUdfDir(tenantCode) + FOLDER_SEPARATOR; + } + + @Override + public boolean mkdir(String tenantCode, String path) throws IOException { + final String key = path + FOLDER_SEPARATOR; + if (!obsClient.doesObjectExist(bucketName, key)) { + createObsPrefix(bucketName, key); + } + return true; + } + + protected void createObsPrefix(final String bucketName, final String key) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(0L); + InputStream emptyContent = new ByteArrayInputStream(new byte[0]); + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, emptyContent); + obsClient.putObject(putObjectRequest); + } + + @Override + public String getResourceFullName(String tenantCode, String fileName) { + if (fileName.startsWith(FOLDER_SEPARATOR)) { + fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); + } + return String.format(FORMAT_S_S, getObsResDir(tenantCode), fileName); + } + + @Override + public String getResourceFileName(String tenantCode, String fullName) { + String resDir = getResDir(tenantCode); + return fullName.replaceFirst(resDir, ""); + } + + @Override + public String getFileName(ResourceType resourceType, String tenantCode, String fileName) { + if (fileName.startsWith(FOLDER_SEPARATOR)) { + fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); + } + return getDir(resourceType, tenantCode) + fileName; + } + + @Override + public boolean delete(String fullName, List childrenPathList, boolean recursive) throws IOException { + // append the resource fullName to the list for deletion. + childrenPathList.add(fullName); + + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName); + for (String deleteKys : childrenPathList) { + deleteObjectsRequest.addKeyAndVersion(deleteKys); + } + + try { + obsClient.deleteObjects(deleteObjectsRequest); + } catch (Exception e) { + log.error("delete objects error", e); + return false; + } + + return true; + } + + @Override + public void download(String tenantCode, String srcFilePath, String dstFilePath, + boolean overwrite) throws IOException { + File dstFile = new File(dstFilePath); + if (dstFile.isDirectory()) { + Files.delete(dstFile.toPath()); + } else { + Files.createDirectories(dstFile.getParentFile().toPath()); + } + ObsObject obsObject = obsClient.getObject(bucketName, srcFilePath); + try ( + InputStream obsInputStream = obsObject.getObjectContent(); + FileOutputStream fos = new FileOutputStream(dstFilePath)) { + byte[] readBuf = new byte[1024]; + int readLen; + while ((readLen = obsInputStream.read(readBuf)) > 0) { + fos.write(readBuf, 0, readLen); + } + } catch (ObsException e) { + throw new IOException(e); + } catch (FileNotFoundException e) { + log.error("cannot find the destination file {}", dstFilePath); + throw e; + } + } + + @Override + public boolean exists(String fileName) throws IOException { + return obsClient.doesObjectExist(bucketName, fileName); + } + + @Override + public boolean delete(String filePath, boolean recursive) throws IOException { + try { + obsClient.deleteObject(bucketName, filePath); + return true; + } catch (ObsException e) { + log.error("fail to delete the object, the resource path is {}", filePath, e); + return false; + } + } + + @Override + public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException { + obsClient.copyObject(bucketName, srcPath, bucketName, dstPath); + if (deleteSource) { + obsClient.deleteObject(bucketName, srcPath); + } + return true; + } + + @Override + public String getDir(ResourceType resourceType, String tenantCode) { + switch (resourceType) { + case UDF: + return getUdfDir(tenantCode); + case FILE: + return getResDir(tenantCode); + case ALL: + return getObsDataBasePath(); + default: + return ""; + } + } + + @Override + public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, + boolean overwrite) throws IOException { + try { + obsClient.putObject(bucketName, dstPath, new File(srcFile)); + if (deleteSource) { + Files.delete(Paths.get(srcFile)); + } + return true; + } catch (ObsException e) { + log.error("upload failed, the bucketName is {}, the filePath is {}", bucketName, dstPath, e); + return false; + } + } + + @Override + public List vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException { + if (StringUtils.isBlank(filePath)) { + log.error("file path:{} is empty", filePath); + return Collections.emptyList(); + } + ObsObject obsObject = obsClient.getObject(bucketName, filePath); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(obsObject.getObjectContent()))) { + Stream stream = bufferedReader.lines().skip(skipLineNums).limit(limit); + return stream.collect(Collectors.toList()); + } + } + + @Override + public ResUploadType returnStorageType() { + return ResUploadType.OBS; + } + + @Override + public List listFilesStatusRecursively(String path, String defaultPath, String tenantCode, + ResourceType type) { + List storageEntityList = new ArrayList<>(); + LinkedList foldersToFetch = new LinkedList<>(); + + StorageEntity initialEntity = null; + try { + initialEntity = getFileStatus(path, defaultPath, tenantCode, type); + } catch (Exception e) { + log.error("error while listing files status recursively, path: {}", path, e); + return storageEntityList; + } + foldersToFetch.add(initialEntity); + + while (!foldersToFetch.isEmpty()) { + String pathToExplore = foldersToFetch.pop().getFullName(); + try { + List tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); + for (StorageEntity temp : tempList) { + if (temp.isDirectory()) { + foldersToFetch.add(temp); + } + } + storageEntityList.addAll(tempList); + } catch (Exception e) { + log.error("error while listing files stat:wus recursively, path: {}", pathToExplore, e); + } + } + + return storageEntityList; + } + + @Override + public List listFilesStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + List storageEntityList = new ArrayList<>(); + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucketName); + request.setPrefix(path); + request.setDelimiter(FOLDER_SEPARATOR); + ObjectListing result = null; + try { + result = obsClient.listObjects(request); + } catch (Exception e) { + throw new ServiceException("Get ObsClient file list exception", e); + } + + while (result != null) { + String nextMarker = result.getNextMarker(); + List objects = result.getObjects(); + + for (ObsObject object : objects) { + if (!object.getObjectKey().endsWith(FOLDER_SEPARATOR)) { + // the path is a file + String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, object.getObjectKey()); + + StorageEntity entity = new StorageEntity(); + ObjectMetadata metadata = object.getMetadata(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(object.getObjectKey()); + entity.setDirectory(false); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(metadata.getContentLength()); + entity.setCreateTime(metadata.getLastModified()); + entity.setUpdateTime(metadata.getLastModified()); + entity.setPfullName(path); + + storageEntityList.add(entity); + } + } + + for (String commonPrefix : result.getCommonPrefixes()) { + // the paths in commonPrefix are directories + String suffix = StringUtils.difference(path, commonPrefix); + String fileName = StringUtils.difference(defaultPath, commonPrefix); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(suffix); + entity.setFileName(fileName); + entity.setFullName(commonPrefix); + entity.setDirectory(true); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + entity.setCreateTime(null); + entity.setUpdateTime(null); + entity.setPfullName(path); + + storageEntityList.add(entity); + } + + if (!StringUtils.isNotBlank(nextMarker)) { + break; + } + + request.setMarker(nextMarker); + try { + result = obsClient.listObjects(request); + } catch (Exception e) { + throw new ServiceException("Get ObsClient file list exception", e); + } + } + return storageEntityList; + } + + @Override + public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + + if (path.endsWith(FOLDER_SEPARATOR)) { + // the path is a directory that may or may not exist in ObsClient + String alias = findDirAlias(path); + String fileName = StringUtils.difference(defaultPath, path); + + StorageEntity entity = new StorageEntity(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(path); + entity.setDirectory(true); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(0); + + return entity; + + } else { + GetObjectRequest request = new GetObjectRequest(); + request.setBucketName(bucketName); + request.setObjectKey(path); + ObsObject object; + try { + object = obsClient.getObject(request); + } catch (Exception e) { + throw new ServiceException("Get ObsClient file list exception", e); + } + + String[] aliasArr = object.getObjectKey().split(FOLDER_SEPARATOR); + String alias = aliasArr[aliasArr.length - 1]; + String fileName = StringUtils.difference(defaultPath, object.getObjectKey()); + + StorageEntity entity = new StorageEntity(); + ObjectMetadata metadata = object.getMetadata(); + entity.setAlias(alias); + entity.setFileName(fileName); + entity.setFullName(object.getObjectKey()); + entity.setDirectory(false); + entity.setUserName(tenantCode); + entity.setType(type); + entity.setSize(metadata.getContentLength()); + entity.setCreateTime(metadata.getLastModified()); + entity.setUpdateTime(metadata.getLastModified()); + + return entity; + } + + } + + @Override + public void deleteTenant(String tenantCode) throws Exception { + deleteTenantCode(tenantCode); + } + + public String getObsResDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_FILE, getObsTenantDir(tenantCode)); + } + + public String getObsUdfDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_UDF, getObsTenantDir(tenantCode)); + } + + public String getObsTenantDir(String tenantCode) { + return String.format(FORMAT_S_S, getObsDataBasePath(), tenantCode); + } + + public String getObsDataBasePath() { + if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) { + return ""; + } else { + return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, ""); + } + } + + protected void deleteTenantCode(String tenantCode) { + deleteDir(getResDir(tenantCode)); + deleteDir(getUdfDir(tenantCode)); + } + + public void ensureBucketSuccessfullyCreated(String bucketName) { + if (StringUtils.isBlank(bucketName)) { + throw new IllegalArgumentException("resource.alibaba.cloud.obs.bucket.name is empty"); + } + + boolean existsBucket = obsClient.headBucket(bucketName); + if (!existsBucket) { + throw new IllegalArgumentException( + "bucketName: " + bucketName + " is not exists, you need to create them by yourself"); + } + + log.info("bucketName: {} has been found", bucketName); + } + + protected void deleteDir(String directoryName) { + if (obsClient.doesObjectExist(bucketName, directoryName)) { + obsClient.deleteObject(bucketName, directoryName); + } + } + + protected ObsClient buildObsClient() { + return new ObsClient(accessKeyId, accessKeySecret, endPoint); + } + + private String findDirAlias(String dirPath) { + if (!dirPath.endsWith(FOLDER_SEPARATOR)) { + return dirPath; + } + + Path path = Paths.get(dirPath); + return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR; + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java new file mode 100644 index 0000000000..2e67103931 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/main/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.obs; + +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory; +import org.apache.dolphinscheduler.plugin.storage.api.StorageType; + +import com.google.auto.service.AutoService; + +@AutoService({StorageOperateFactory.class}) +public class ObsStorageOperatorFactory implements StorageOperateFactory { + + public ObsStorageOperatorFactory() { + } + + @Override + public StorageOperate createStorageOperate() { + ObsStorageOperator ossOperator = new ObsStorageOperator(); + ossOperator.init(); + return ossOperator; + } + + @Override + public StorageType getStorageOperate() { + return StorageType.OBS; + } +} diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java new file mode 100644 index 0000000000..4aabdb6bd8 --- /dev/null +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/java/org/apache/dolphinscheduler/plugin/storage/obs/ObsStorageOperatorTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.storage.obs; + +import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.obs.services.ObsClient; + +@ExtendWith(MockitoExtension.class) +public class ObsStorageOperatorTest { + + private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK"; + private static final String ACCESS_KEY_SECRET_MOCK = "ACCESS_KEY_SECRET_MOCK"; + private static final String END_POINT_MOCK = "END_POINT_MOCK"; + private static final String BUCKET_NAME_MOCK = "BUCKET_NAME_MOCK"; + private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK"; + private static final String DIR_MOCK = "DIR_MOCK"; + private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK"; + private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK"; + private static final String FULL_NAME = "/tmp/dir1/"; + + private static final String DEFAULT_PATH = "/tmp/"; + @Mock + private ObsClient obsClientMock; + + private ObsStorageOperator obsOperator; + + @BeforeEach + public void setUp() throws Exception { + obsOperator = spy(new ObsStorageOperator()); + doReturn(ACCESS_KEY_ID_MOCK).when(obsOperator) + .readObsAccessKeyID(); + doReturn(ACCESS_KEY_SECRET_MOCK).when(obsOperator) + .readObsAccessKeySecret(); + doReturn(BUCKET_NAME_MOCK).when(obsOperator).readObsBucketName(); + doReturn(END_POINT_MOCK).when(obsOperator).readObsEndPoint(); + doReturn(obsClientMock).when(obsOperator).buildObsClient(); + doNothing().when(obsOperator).ensureBucketSuccessfullyCreated(any()); + + obsOperator.init(); + + } + + @Test + public void initObsOperator() { + verify(obsOperator, times(1)).buildObsClient(); + Assertions.assertEquals(ACCESS_KEY_ID_MOCK, obsOperator.getAccessKeyId()); + Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK, obsOperator.getAccessKeySecret()); + Assertions.assertEquals(BUCKET_NAME_MOCK, obsOperator.getBucketName()); + } + + @Test + public void tearDownObsOperator() throws IOException { + doNothing().when(obsClientMock).close(); + obsOperator.close(); + verify(obsClientMock, times(1)).close(); + } + + @Test + public void createTenantResAndUdfDir() throws Exception { + doReturn(DIR_MOCK).when(obsOperator).getObsResDir(TENANT_CODE_MOCK); + doReturn(DIR_MOCK).when(obsOperator).getObsUdfDir(TENANT_CODE_MOCK); + doReturn(true).when(obsOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK); + obsOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK); + verify(obsOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK); + } + + @Test + public void getResDir() { + final String expectedResourceDir = String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK); + final String dir = obsOperator.getResDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedResourceDir, dir); + } + + @Test + public void getUdfDir() { + final String expectedUdfDir = String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK); + final String dir = obsOperator.getUdfDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedUdfDir, dir); + } + + @Test + public void mkdirWhenDirExists() { + boolean isSuccess = false; + try { + final String key = DIR_MOCK + FOLDER_SEPARATOR; + doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key); + isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK); + verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key); + + } catch (IOException e) { + Assertions.fail("test failed due to unexpected IO exception"); + } + + Assertions.assertTrue(isSuccess); + } + + @Test + public void mkdirWhenDirNotExists() { + boolean isSuccess = true; + try { + final String key = DIR_MOCK + FOLDER_SEPARATOR; + doReturn(false).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, key); + doNothing().when(obsOperator).createObsPrefix(BUCKET_NAME_MOCK, key); + isSuccess = obsOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK); + verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key); + verify(obsOperator, times(1)).createObsPrefix(BUCKET_NAME_MOCK, key); + + } catch (IOException e) { + Assertions.fail("test failed due to unexpected IO exception"); + } + + Assertions.assertTrue(isSuccess); + } + + @Test + public void getResourceFullName() { + final String expectedResourceFullName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFullName = obsOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedResourceFullName, resourceFullName); + } + + @Test + public void getResourceFileName() { + final String expectedResourceFileName = FILE_NAME_MOCK; + final String resourceFullName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String resourceFileName = obsOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName); + Assertions.assertEquals(expectedResourceFileName, resourceFileName); + } + + @Test + public void getFileName() { + final String expectedFileName = + String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); + final String fileName = obsOperator.getFileName(ResourceType.FILE, TENANT_CODE_MOCK, FILE_NAME_MOCK); + Assertions.assertEquals(expectedFileName, fileName); + } + + @Test + public void exists() { + boolean doesExist = false; + doReturn(true).when(obsClientMock).doesObjectExist(BUCKET_NAME_MOCK, FILE_NAME_MOCK); + try { + doesExist = obsOperator.exists(FILE_NAME_MOCK); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(doesExist); + verify(obsClientMock, times(1)).doesObjectExist(BUCKET_NAME_MOCK, FILE_NAME_MOCK); + } + + @Test + public void delete() { + boolean isDeleted = false; + doReturn(null).when(obsClientMock).deleteObject(anyString(), anyString()); + try { + isDeleted = obsOperator.delete(FILE_NAME_MOCK, true); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(isDeleted); + verify(obsClientMock, times(1)).deleteObject(anyString(), anyString()); + } + + @Test + public void copy() { + boolean isSuccess = false; + doReturn(null).when(obsClientMock).copyObject(anyString(), anyString(), anyString(), anyString()); + try { + isSuccess = obsOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); + } catch (IOException e) { + Assertions.fail("unexpected IO exception in unit test"); + } + + Assertions.assertTrue(isSuccess); + verify(obsClientMock, times(1)).copyObject(anyString(), anyString(), anyString(), anyString()); + } + + @Test + public void deleteTenant() { + doNothing().when(obsOperator).deleteTenantCode(anyString()); + try { + obsOperator.deleteTenant(TENANT_CODE_MOCK); + } catch (Exception e) { + Assertions.fail("unexpected exception caught in unit test"); + } + + verify(obsOperator, times(1)).deleteTenantCode(anyString()); + } + + @Test + public void getObsResDir() { + final String expectedObsResDir = String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK); + final String obsResDir = obsOperator.getObsResDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedObsResDir, obsResDir); + } + + @Test + public void getObsUdfDir() { + final String expectedObsUdfDir = String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK); + final String obsUdfDir = obsOperator.getObsUdfDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedObsUdfDir, obsUdfDir); + } + + @Test + public void getObsTenantDir() { + final String expectedObsTenantDir = String.format(FORMAT_S_S, DIR_MOCK, TENANT_CODE_MOCK); + doReturn(DIR_MOCK).when(obsOperator).getObsDataBasePath(); + final String obsTenantDir = obsOperator.getObsTenantDir(TENANT_CODE_MOCK); + Assertions.assertEquals(expectedObsTenantDir, obsTenantDir); + } + + @Test + public void deleteDir() { + doReturn(true).when(obsClientMock).doesObjectExist(anyString(), anyString()); + obsOperator.deleteDir(DIR_MOCK); + verify(obsClientMock, times(1)).deleteObject(anyString(), anyString()); + } + + @Test + public void testGetFileStatus() throws Exception { + StorageEntity entity = obsOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(FULL_NAME, entity.getFullName()); + Assertions.assertEquals("dir1/", entity.getFileName()); + } + + @Test + public void testListFilesStatus() throws Exception { + List result = + obsOperator.listFilesStatus("dolphinscheduler/default/resources/", + "dolphinscheduler/default/resources/", + "default", ResourceType.FILE); + Assertions.assertEquals(0, result.size()); + } + + @Test + public void testListFilesStatusRecursively() throws Exception { + StorageEntity entity = new StorageEntity(); + entity.setFullName(FULL_NAME); + + doReturn(entity).when(obsOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + doReturn(Collections.EMPTY_LIST).when(obsOperator).listFilesStatus(anyString(), anyString(), anyString(), + Mockito.any(ResourceType.class)); + + List result = + obsOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); + Assertions.assertEquals(0, result.size()); + } +} diff --git a/dolphinscheduler-storage-plugin/pom.xml b/dolphinscheduler-storage-plugin/pom.xml index 7e27358a64..a6a86bc2a6 100644 --- a/dolphinscheduler-storage-plugin/pom.xml +++ b/dolphinscheduler-storage-plugin/pom.xml @@ -35,6 +35,7 @@ dolphinscheduler-storage-oss dolphinscheduler-storage-gcs dolphinscheduler-storage-abs + dolphinscheduler-storage-obs diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 2d314382c5..ab46bb58d5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -475,6 +475,12 @@ public class TaskConstants { public static final String ALIBABA_CLOUD_ACCESS_KEY_SECRET = "resource.alibaba.cloud.access.key.secret"; public static final String ALIBABA_CLOUD_REGION = "resource.alibaba.cloud.region"; + /** + * huawei cloud config + */ + public static final String HUAWEI_CLOUD_ACCESS_KEY_ID = "resource.huawei.cloud.access.key.id"; + public static final String HUAWEI_CLOUD_ACCESS_KEY_SECRET = "resource.huawei.cloud.access.key.secret"; + /** * use for k8s task */ diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 28d2f1fe70..23fb7919d9 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -495,3 +495,4 @@ snowflake-jdbc-3.13.29.jar azure-storage-blob-12.21.0.jar azure-storage-internal-avro-12.6.0.jar vertica-jdbc-12.0.4-0.jar +esdk-obs-java-bundle-3.23.3.jar \ No newline at end of file