diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst index 16febd4fcb..b3023377de 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst @@ -22,7 +22,7 @@ GitHub When using a github resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition, such as `resource_plugin=GitHub(prefix="https://github.com/xxx", access_token="ghpxx")`. -The token parameter is optional. You need to add it when your warehouse is a private repository. +The token parameter is optional. You need to add it when your repository is a private repository. You can view this `document `_ when creating a token. diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/gitlab.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/gitlab.rst new file mode 100644 index 0000000000..fdf43c9d2f --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/gitlab.rst @@ -0,0 +1,46 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +GitLab +====== + +`GitLab` is a gitlab resource plugin for pydolphinscheduler. + +When using a gitlab resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition, +such as `resource_plugin=GitLab(prefix="xxx")`, if it is a public repository. + +If it is a private or Internal repository, you can use three ways to obtain authentication. + +The first is `Personal Access Tokens`, using `resource_plugin=GitLab(prefix="xxx", private_token="xxx")`. + +The second method is to obtain authentication through `username` and `password`: + +using `resource_plugin=GitLab(prefix="xxx", username="username", password="pwd")`. + +The third method is to obtain authentication through `OAuth Token`: + +using `resource_plugin=GitLab(prefix="xxx", oauth_token="xx")`. + +You can view this `document `_ +when creating a `Personal Access Tokens`. + +For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin` + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.resources_plugin.gitlab \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst index 1110cf315f..c984f06048 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst @@ -26,4 +26,7 @@ In this section develop resource-plugin local - github \ No newline at end of file + github + gitlab + oss + s3 \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/oss.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/oss.rst new file mode 100644 index 0000000000..fbb6785d1d --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/oss.rst @@ -0,0 +1,44 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +OSS +=== + +`OSS` is a Aliyun OSS resource plugin for pydolphinscheduler. + +When using a OSS resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition, +such as `resource_plugin=OSS(prefix="xxx")`, if the file is publicly readable. + +When the file is private, using `resource_plugin=OSS(prefix="xxx", access_key_id="xxx", access_key_secret="xxx")` + +Notice +The read permission of files in a bucket is inherited from the bucket by default. In other words, if the bucket is private, +the files in it are also private. + +But the read permission of the files in the bucket can be changed, in other words, the files in the private bucket can also be read publicly. + +So whether the `AccessKey` is needed depends on whether the file is private or not. + +You can view this `document `_ +when creating a pair `AccessKey`. + +For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin` + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.resources_plugin.OSS diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst index bdd7dc3a35..2a32526208 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst @@ -58,7 +58,7 @@ How to use Resource plugin can be used in task subclasses and workflows. You can use the resource plugin by adding the `resource_plugin` parameter when they are initialized. For example, local resource plugin, add `resource_plugin = Local("/tmp")`. -The resource plugin we currently support are `local`, `github`. +The resource plugin we currently support are `local`, `github`, `gitlab`, `OSS`, `S3`. Here is an example. diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/s3.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/s3.rst new file mode 100644 index 0000000000..f5bc1d37fe --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/s3.rst @@ -0,0 +1,36 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +S3 +== + +`S3` is a Amazon S3 resource plugin for pydolphinscheduler. + +When using a Amazon S3 resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition, +such as `resource_plugin=S3(prefix="xxx")`, if the file is publicly readable. + +When the file is private, using `resource_plugin=S3(prefix="xxx", access_key_id="xxx", access_key_secret="xxx")` + +You can view this `document `_ +when creating a pair `AccessKey`. + +For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin` + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.resources_plugin.S3 diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py index 2a1291548f..66a1ffc86c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/setup.py +++ b/dolphinscheduler-python/pydolphinscheduler/setup.py @@ -36,6 +36,9 @@ version = "dev" # Start package required prod = [ + "boto3>=1.23.10", + "oss2>=2.16.0", + "python-gitlab>=2.10.1", "click>=8.0.0", "py4j~=0.10", "ruamel.yaml", diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index fd640c512f..de5ce26002 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -111,3 +111,10 @@ class ResourceKey(str): """Constants for key of resource.""" ID = "id" + + +class Symbol(str): + """Constants for symbol.""" + + SLASH = "/" + POINT = "." diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py index 110988401f..1e24e1eb87 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py @@ -17,9 +17,9 @@ """Init resources_plugin package.""" from pydolphinscheduler.resources_plugin.github import GitHub +from pydolphinscheduler.resources_plugin.gitlab import GitLab from pydolphinscheduler.resources_plugin.local import Local +from pydolphinscheduler.resources_plugin.oss import OSS +from pydolphinscheduler.resources_plugin.s3 import S3 -__all__ = [ - "Local", - "GitHub", -] +__all__ = ["Local", "GitHub", "GitLab", "OSS", "S3"] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/bucket.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/bucket.py new file mode 100644 index 0000000000..bae4366c81 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/bucket.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""DolphinScheduler BucketFileInfo and Bucket object.""" +from abc import ABCMeta, abstractmethod +from typing import Optional + + +class BucketFileInfo: + """A class that defines the details of BUCKET files. + + :param bucket: A string representing the bucket to which the bucket file belongs. + :param file_path: A string representing the bucket file path. + """ + + def __init__( + self, + bucket: Optional[str] = None, + file_path: Optional[str] = None, + *args, + **kwargs + ): + self.bucket = bucket + self.file_path = file_path + + +class OSSFileInfo(BucketFileInfo): + """A class that defines the details of OSS files. + + :param endpoint: A string representing the OSS file endpoint. + :param bucket: A string representing the bucket to which the OSS file belongs. + :param file_path: A string representing the OSS file path. + """ + + def __init__( + self, + endpoint: Optional[str] = None, + bucket: Optional[str] = None, + file_path: Optional[str] = None, + *args, + **kwargs + ): + super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs) + self.endpoint = endpoint + + +class S3FileInfo(BucketFileInfo): + """A class that defines the details of S3 files. + + :param bucket: A string representing the bucket to which the S3 file belongs. + :param file_path: A string representing the S3 file path. + """ + + def __init__( + self, + bucket: Optional[str] = None, + file_path: Optional[str] = None, + *args, + **kwargs + ): + super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs) + + +class Bucket(object, metaclass=ABCMeta): + """An abstract class of online code repository based on git implementation.""" + + _bucket_file_info: Optional = None + + @abstractmethod + def get_bucket_file_info(self, path: str): + """Get the detailed information of BUCKET file according to the file URL.""" + raise NotImplementedError diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py index a36ab50547..4fc2a17ccb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py @@ -73,19 +73,43 @@ class GitHubFileInfo(GitFileInfo): ) -# [start Git] +class GitLabFileInfo(GitFileInfo): + """A class that defines the details of GitLab files. + + :param host: A string representing the domain name the GitLab file belongs to. + :param user: A string representing the user the GitLab file belongs to. + :param repo_name: A string representing the repository to which the GitLab file belongs. + :param branch: A string representing the branch to which the GitHub file belongs. + :param file_path: A string representing the GitHub file path. + """ + + def __init__( + self, + host: Optional[str] = None, + user: Optional[str] = None, + repo_name: Optional[str] = None, + branch: Optional[str] = None, + file_path: Optional[str] = None, + *args, + **kwargs + ): + super().__init__( + user=user, + repo_name=repo_name, + branch=branch, + file_path=file_path, + *args, + **kwargs + ) + self.host = host + + class Git(object, metaclass=ABCMeta): - """An abstract class of online code warehouse based on git implementation.""" + """An abstract class of online code repository based on git implementation.""" - _git_file_info: Optional[GitFileInfo] = None + _git_file_info: Optional = None - # [start abstractmethod git_file_info] @abstractmethod def get_git_file_info(self, path: str): """Get the detailed information of GIT file according to the file URL.""" raise NotImplementedError - - # [end abstractmethod git_file_info] - - -# [end Git] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py index 95a43a8ce4..45648647c6 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py @@ -22,6 +22,7 @@ from urllib.parse import urljoin import requests +from pydolphinscheduler.constants import Symbol from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.resources_plugin.base.git import Git, GitHubFileInfo @@ -39,6 +40,8 @@ class GitHub(ResourcePlugin, Git): super().__init__(prefix, *args, **kwargs) self.access_token = access_token + _git_file_info: Optional[GitHubFileInfo] = None + def build_req_api( self, user: str, @@ -54,8 +57,8 @@ class GitHub(ResourcePlugin, Git): def get_git_file_info(self, path: str): """Get file information from the file url, like repository name, user, branch, and file path.""" - elements = path.split("/") - index = self.get_index(path, "/", 7) + elements = path.split(Symbol.SLASH) + index = self.get_index(path, Symbol.SLASH, 7) index = index + 1 file_info = GitHubFileInfo( user=elements[3], diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/gitlab.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/gitlab.py new file mode 100644 index 0000000000..f035ecaeff --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/gitlab.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""DolphinScheduler gitlab resource plugin.""" +from typing import Optional +from urllib.parse import urljoin, urlparse + +import gitlab +import requests + +from pydolphinscheduler.constants import Symbol +from pydolphinscheduler.core.resource_plugin import ResourcePlugin +from pydolphinscheduler.resources_plugin.base.git import Git, GitLabFileInfo + + +class GitLab(ResourcePlugin, Git): + """GitLab object, declare GitLab resource plugin for task and workflow to dolphinscheduler. + + :param prefix: A string representing the prefix of GitLab. + :param private_token: A string used for identity authentication of GitLab private or Internal repository. + :param oauth_token: A string used for identity authentication of GitLab private or Internal repository. + :param username: A string representing the user of the repository. + :param password: A string representing the user password. + """ + + def __init__( + self, + prefix: str, + private_token: Optional[str] = None, + oauth_token: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + *args, + **kwargs, + ): + super().__init__(prefix, *args, **kwargs) + self.private_token = private_token + self.oauth_token = oauth_token + self.username = username + self.password = password + + def get_git_file_info(self, path: str): + """Get file information from the file url, like repository name, user, branch, and file path.""" + self.get_index(path, Symbol.SLASH, 8) + result = urlparse(path) + elements = result.path.split(Symbol.SLASH) + self._git_file_info = GitLabFileInfo( + host=f"{result.scheme}://{result.hostname}", + repo_name=elements[2], + branch=elements[5], + file_path=Symbol.SLASH.join( + str(elements[i]) for i in range(6, len(elements)) + ), + user=elements[1], + ) + + def authentication(self): + """Gitlab authentication.""" + host = self._git_file_info.host + if self.private_token is not None: + return gitlab.Gitlab(host, private_token=self.private_token) + if self.oauth_token is not None: + return gitlab.Gitlab(host, oauth_token=self.oauth_token) + if self.username is not None and self.password is not None: + oauth_token = self.OAuth_token() + return gitlab.Gitlab(host, oauth_token=oauth_token) + return gitlab.Gitlab(host) + + def OAuth_token(self): + """Obtain OAuth Token.""" + data = { + "grant_type": "password", + "username": self.username, + "password": self.password, + } + host = self._git_file_info.host + resp = requests.post("%s/oauth/token" % host, data=data) + oauth_token = resp.json()["access_token"] + return oauth_token + + def read_file(self, suf: str): + """Get the content of the file. + + The address of the file is the prefix of the resource plugin plus the parameter suf. + """ + path = urljoin(self.prefix, suf) + self.get_git_file_info(path) + gl = self.authentication() + project = gl.projects.get( + "%s/%s" % (self._git_file_info.user, self._git_file_info.repo_name) + ) + return ( + project.files.get( + file_path=self._git_file_info.file_path, ref=self._git_file_info.branch + ) + .decode() + .decode() + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py index 8a20ed9737..c1fc56d3d3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py @@ -28,7 +28,6 @@ class Local(ResourcePlugin): """Local object, declare local resource plugin for task and workflow to dolphinscheduler. :param prefix: A string representing the prefix of Local. - """ # [start init_method] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/oss.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/oss.py new file mode 100644 index 0000000000..1a9acbb9ca --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/oss.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""DolphinScheduler oss resource plugin.""" +from typing import Optional +from urllib.parse import urljoin, urlparse + +import oss2 + +from pydolphinscheduler.constants import Symbol +from pydolphinscheduler.core.resource_plugin import ResourcePlugin +from pydolphinscheduler.resources_plugin.base.bucket import Bucket, OSSFileInfo + + +class OSS(ResourcePlugin, Bucket): + """OSS object, declare OSS resource plugin for task and workflow to dolphinscheduler. + + :param prefix: A string representing the prefix of OSS. + :param access_key_id: A string representing the ID of AccessKey for AliCloud OSS. + :param access_key_secret: A string representing the secret of AccessKey for AliCloud OSS. + """ + + def __init__( + self, + prefix: str, + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + *args, + **kwargs, + ): + super().__init__(prefix, *args, **kwargs) + self.access_key_id = access_key_id + self.access_key_secret = access_key_secret + + _bucket_file_info: Optional[OSSFileInfo] = None + + def get_bucket_file_info(self, path: str): + """Get file information from the file url, like repository name, user, branch, and file path.""" + self.get_index(path, Symbol.SLASH, 3) + result = urlparse(path) + hostname = result.hostname + elements = hostname.split(Symbol.POINT) + self._bucket_file_info = OSSFileInfo( + endpoint=f"{result.scheme}://" + f"{Symbol.POINT.join(str(elements[i]) for i in range(1, len(elements)))}", + bucket=hostname.split(Symbol.POINT)[0], + file_path=result.path[1:], + ) + + def read_file(self, suf: str): + """Get the content of the file. + + The address of the file is the prefix of the resource plugin plus the parameter suf. + """ + path = urljoin(self.prefix, suf) + self.get_bucket_file_info(path) + auth = oss2.Auth(self.access_key_id, self.access_key_secret) + bucket = oss2.Bucket( + auth, self._bucket_file_info.endpoint, self._bucket_file_info.bucket + ) + result = bucket.get_object(self._bucket_file_info.file_path).read().decode() + return result.read().decode() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/s3.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/s3.py new file mode 100644 index 0000000000..da1fe83fd1 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/s3.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""DolphinScheduler S3 resource plugin.""" + +from typing import Optional +from urllib.parse import urljoin + +import boto3 + +from pydolphinscheduler.constants import Symbol +from pydolphinscheduler.core.resource_plugin import ResourcePlugin +from pydolphinscheduler.resources_plugin.base.bucket import Bucket, S3FileInfo + + +class S3(ResourcePlugin, Bucket): + """S3 object, declare S3 resource plugin for task and workflow to dolphinscheduler. + + :param prefix: A string representing the prefix of S3. + :param access_key_id: A string representing the ID of AccessKey for Amazon S3. + :param access_key_secret: A string representing the secret of AccessKey for Amazon S3. + """ + + def __init__( + self, + prefix: str, + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + *args, + **kwargs + ): + super().__init__(prefix, *args, **kwargs) + self.access_key_id = access_key_id + self.access_key_secret = access_key_secret + + _bucket_file_info: Optional[S3FileInfo] = None + + def get_bucket_file_info(self, path: str): + """Get file information from the file url, like repository name, user, branch, and file path.""" + elements = path.split(Symbol.SLASH) + self.get_index(path, Symbol.SLASH, 3) + self._bucket_file_info = S3FileInfo( + bucket=elements[2].split(Symbol.POINT)[0], + file_path=Symbol.SLASH.join( + str(elements[i]) for i in range(3, len(elements)) + ), + ) + + def read_file(self, suf: str): + """Get the content of the file. + + The address of the file is the prefix of the resource plugin plus the parameter suf. + """ + path = urljoin(self.prefix, suf) + self.get_bucket_file_info(path) + bucket = self._bucket_file_info.bucket + key = self._bucket_file_info.file_path + s3_resource = boto3.resource("s3") + s3_object = s3_resource.Object(bucket, key) + return s3_object.get()["Body"].read().decode("utf-8") diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_gitlab.py b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_gitlab.py new file mode 100644 index 0000000000..6bb90acc72 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_gitlab.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test github resource plugin.""" +import pytest + +from pydolphinscheduler.resources_plugin.gitlab import GitLab + + +@pytest.mark.parametrize( + "attr, expected", + [ + ( + "https://gitlab.com/pydolphinscheduler/ds-gitlab/-/blob/main/union.sh", + { + "branch": "main", + "file_path": "union.sh", + "host": "https://gitlab.com", + "repo_name": "ds-gitlab", + "user": "pydolphinscheduler", + }, + ), + ( + "https://gitlab.com/pydolphinscheduler/ds/-/blob/dev/test/exc.sh", + { + "branch": "dev", + "file_path": "test/exc.sh", + "host": "https://gitlab.com", + "repo_name": "ds", + "user": "pydolphinscheduler", + }, + ), + ], +) +def test_gitlab_get_git_file_info(attr, expected): + """Test the get_file_info function of the gitlab resource plugin.""" + gitlab = GitLab(prefix="prefix") + gitlab.get_git_file_info(attr) + assert expected == gitlab._git_file_info.__dict__ + + +@pytest.mark.skip(reason="This test needs gitlab service") +@pytest.mark.parametrize( + "attr, expected", + [ + ( + { + "init": { + "prefix": "https://gitlab.com/pydolphinscheduler/ds-internal/-/blob/main", + "oauth_token": "24518bd4cf5bfe9xx", + }, + "file_path": "union.sh", + }, + "test gitlab resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://gitlab.com/pydolphinscheduler/ds/-/blob/main", + "private_token": "9TyTe2xx", + }, + "file_path": "union.sh", + }, + "test gitlab resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://gitlab.com/pydolphinscheduler/ds-gitlab/-/blob/main", + "username": "pydolphinscheduler", + "password": "4295xx", + }, + "file_path": "union.sh", + }, + "test gitlab resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://gitlab.com/pydolphinscheduler/ds-public/-/blob/main", + }, + "file_path": "union.sh", + }, + "test gitlab resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://gitlab.com/pydolphinscheduler/ds-internal/-/blob/main", + "username": "pydolphinscheduler", + "password": "429xxx", + }, + "file_path": "union.sh", + }, + "test gitlab resource plugin\n", + ), + ], +) +def test_gitlab_read_file(attr, expected): + """Test the read_file function of the gitlab resource plug-in.""" + gitlab = GitLab(**attr.get("init")) + assert expected == gitlab.read_file(attr.get("file_path")) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_oss.py b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_oss.py new file mode 100644 index 0000000000..7e57e8230e --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_oss.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test oss resource plugin.""" +import pytest + +from pydolphinscheduler.resources_plugin.oss import OSS + + +@pytest.mark.parametrize( + "attr, expected", + [ + ( + "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com/a.sh", + { + "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", + "file_path": "a.sh", + "bucket": "ospp-ds-private", + }, + ), + ( + "https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com/dir/a.sh", + { + "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", + "file_path": "dir/a.sh", + "bucket": "ospp-ds-public", + }, + ), + ], +) +def test_oss_get_bucket_file_info(attr, expected): + """Test the get_bucket_file_info function of the oss resource plugin.""" + oss = OSS(prefix="prefix") + oss.get_bucket_file_info(attr) + assert expected == oss._bucket_file_info.__dict__ + + +@pytest.mark.skip(reason="This test requires OSS services") +@pytest.mark.parametrize( + "attr, expected", + [ + ( + { + "init": { + "prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com", + "access_key_id": "LTAI5tP25Mxx", + "access_key_secret": "cSur23Qbxx", + }, + "file_path": "a.sh", + }, + "test oss resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com/dir/", + "access_key_id": "LTAxx", + "access_key_secret": "cSur23Qxx", + }, + "file_path": "b.sh", + }, + "test oss resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com", + }, + "file_path": "b.sh", + }, + "test oss resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com", + }, + "file_path": "b.sh", + }, + "test oss resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com/dir/", + "access_key_id": "LTAIxx", + "access_key_secret": "cSurxx", + }, + "file_path": "a.sh", + }, + "test oss resource plugin\n", + ), + ], +) +def test_oss_read_file(attr, expected): + """Test the read_file function of the oss resource plug-in.""" + oss = OSS(**attr.get("init")) + assert expected == oss.read_file(attr.get("file_path")) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_s3.py b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_s3.py new file mode 100644 index 0000000000..5f75f3eb75 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_s3.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test oss resource plugin.""" +import pytest + +from pydolphinscheduler.resources_plugin import S3 + + +@pytest.mark.parametrize( + "attr, expected", + [ + ( + "https://ds-resource-plugin-private.s3.amazonaws.com/a.sh", + { + "file_path": "a.sh", + "bucket": "ds-resource-plugin-private", + }, + ), + ( + "https://ds-resource-plugin-public.s3.amazonaws.com/dir/a.sh", + { + "file_path": "dir/a.sh", + "bucket": "ds-resource-plugin-public", + }, + ), + ], +) +def test_s3_get_bucket_file_info(attr, expected): + """Test the get_bucket_file_info function of the s3 resource plugin.""" + s3 = S3(prefix="prefix") + s3.get_bucket_file_info(attr) + assert expected == s3._bucket_file_info.__dict__ + + +@pytest.mark.skip(reason="This test requires s3 services") +@pytest.mark.parametrize( + "attr, expected", + [ + ( + { + "init": { + "prefix": "https://ds-resource-plugin-private.s3.amazonaws.com/dir/", + "access_key_id": "LTAI5tP25Mxx", + "access_key_secret": "cSur23Qbxx", + }, + "file_path": "a.sh", + }, + "test s3 resource plugin\n", + ), + ( + { + "init": { + "prefix": "https://ds-resource-plugin-public.s3.amazonaws.com/", + }, + "file_path": "a.sh", + }, + "test s3 resource plugin\n", + ), + ], +) +def test_s3_read_file(attr, expected): + """Test the read_file function of the s3 resource plug-in.""" + s3 = S3(**attr.get("init")) + assert expected == s3.read_file(attr.get("file_path"))