Browse Source

[python] Add new resource plugin: gitlab, OSS and S3 (#12025)

Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
3.2.0-release
chenrj 2 years ago committed by GitHub
parent
commit
e1ac0e2605
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst
  2. 46
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/gitlab.rst
  3. 5
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
  4. 44
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/oss.rst
  5. 2
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
  6. 36
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/s3.rst
  7. 3
      dolphinscheduler-python/pydolphinscheduler/setup.py
  8. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  9. 8
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
  10. 86
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/bucket.py
  11. 42
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py
  12. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py
  13. 112
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/gitlab.py
  14. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
  15. 76
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/oss.py
  16. 74
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/s3.py
  17. 116
      dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_gitlab.py
  18. 112
      dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_oss.py
  19. 79
      dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_s3.py

2
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/github.rst

@ -22,7 +22,7 @@ GitHub
When using a github resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition, When using a github resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=GitHub(prefix="https://github.com/xxx", access_token="ghpxx")`. such as `resource_plugin=GitHub(prefix="https://github.com/xxx", access_token="ghpxx")`.
The token parameter is optional. You need to add it when your warehouse is a private repository. The token parameter is optional. You need to add it when your repository is a private repository.
You can view this `document <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token>`_ You can view this `document <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token>`_
when creating a token. when creating a token.

46
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/gitlab.rst

@ -0,0 +1,46 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
GitLab
======
`GitLab` is a gitlab resource plugin for pydolphinscheduler.
When using a gitlab resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=GitLab(prefix="xxx")`, if it is a public repository.
If it is a private or Internal repository, you can use three ways to obtain authentication.
The first is `Personal Access Tokens`, using `resource_plugin=GitLab(prefix="xxx", private_token="xxx")`.
The second method is to obtain authentication through `username` and `password`:
using `resource_plugin=GitLab(prefix="xxx", username="username", password="pwd")`.
The third method is to obtain authentication through `OAuth Token`:
using `resource_plugin=GitLab(prefix="xxx", oauth_token="xx")`.
You can view this `document <https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html#create-a-personal-access-token>`_
when creating a `Personal Access Tokens`.
For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin`
Dive Into
---------
.. automodule:: pydolphinscheduler.resources_plugin.gitlab

5
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst

@ -26,4 +26,7 @@ In this section
develop develop
resource-plugin resource-plugin
local local
github github
gitlab
oss
s3

44
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/oss.rst

@ -0,0 +1,44 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
OSS
===
`OSS` is a Aliyun OSS resource plugin for pydolphinscheduler.
When using a OSS resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=OSS(prefix="xxx")`, if the file is publicly readable.
When the file is private, using `resource_plugin=OSS(prefix="xxx", access_key_id="xxx", access_key_secret="xxx")`
Notice
The read permission of files in a bucket is inherited from the bucket by default. In other words, if the bucket is private,
the files in it are also private.
But the read permission of the files in the bucket can be changed, in other words, the files in the private bucket can also be read publicly.
So whether the `AccessKey` is needed depends on whether the file is private or not.
You can view this `document <https://www.alibabacloud.com/help/en/tablestore/latest/how-can-i-obtain-an-accesskey-pair>`_
when creating a pair `AccessKey`.
For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin`
Dive Into
---------
.. automodule:: pydolphinscheduler.resources_plugin.OSS

2
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst

@ -58,7 +58,7 @@ How to use
Resource plugin can be used in task subclasses and workflows. You can use the resource plugin by adding the `resource_plugin` parameter when they are initialized. Resource plugin can be used in task subclasses and workflows. You can use the resource plugin by adding the `resource_plugin` parameter when they are initialized.
For example, local resource plugin, add `resource_plugin = Local("/tmp")`. For example, local resource plugin, add `resource_plugin = Local("/tmp")`.
The resource plugin we currently support are `local`, `github`. The resource plugin we currently support are `local`, `github`, `gitlab`, `OSS`, `S3`.
Here is an example. Here is an example.

36
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/s3.rst

@ -0,0 +1,36 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
S3
==
`S3` is a Amazon S3 resource plugin for pydolphinscheduler.
When using a Amazon S3 resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=S3(prefix="xxx")`, if the file is publicly readable.
When the file is private, using `resource_plugin=S3(prefix="xxx", access_key_id="xxx", access_key_secret="xxx")`
You can view this `document <https://docs.aws.amazon.com/general/latest/gr/aws-access-keys-best-practices.html>`_
when creating a pair `AccessKey`.
For the specific use of resource plugins, you can see `How to use` in :doc:`resource-plugin`
Dive Into
---------
.. automodule:: pydolphinscheduler.resources_plugin.S3

3
dolphinscheduler-python/pydolphinscheduler/setup.py

@ -36,6 +36,9 @@ version = "dev"
# Start package required # Start package required
prod = [ prod = [
"boto3>=1.23.10",
"oss2>=2.16.0",
"python-gitlab>=2.10.1",
"click>=8.0.0", "click>=8.0.0",
"py4j~=0.10", "py4j~=0.10",
"ruamel.yaml", "ruamel.yaml",

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -111,3 +111,10 @@ class ResourceKey(str):
"""Constants for key of resource.""" """Constants for key of resource."""
ID = "id" ID = "id"
class Symbol(str):
"""Constants for symbol."""
SLASH = "/"
POINT = "."

8
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py

@ -17,9 +17,9 @@
"""Init resources_plugin package.""" """Init resources_plugin package."""
from pydolphinscheduler.resources_plugin.github import GitHub from pydolphinscheduler.resources_plugin.github import GitHub
from pydolphinscheduler.resources_plugin.gitlab import GitLab
from pydolphinscheduler.resources_plugin.local import Local from pydolphinscheduler.resources_plugin.local import Local
from pydolphinscheduler.resources_plugin.oss import OSS
from pydolphinscheduler.resources_plugin.s3 import S3
__all__ = [ __all__ = ["Local", "GitHub", "GitLab", "OSS", "S3"]
"Local",
"GitHub",
]

86
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/bucket.py

@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler BucketFileInfo and Bucket object."""
from abc import ABCMeta, abstractmethod
from typing import Optional
class BucketFileInfo:
"""A class that defines the details of BUCKET files.
:param bucket: A string representing the bucket to which the bucket file belongs.
:param file_path: A string representing the bucket file path.
"""
def __init__(
self,
bucket: Optional[str] = None,
file_path: Optional[str] = None,
*args,
**kwargs
):
self.bucket = bucket
self.file_path = file_path
class OSSFileInfo(BucketFileInfo):
"""A class that defines the details of OSS files.
:param endpoint: A string representing the OSS file endpoint.
:param bucket: A string representing the bucket to which the OSS file belongs.
:param file_path: A string representing the OSS file path.
"""
def __init__(
self,
endpoint: Optional[str] = None,
bucket: Optional[str] = None,
file_path: Optional[str] = None,
*args,
**kwargs
):
super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
self.endpoint = endpoint
class S3FileInfo(BucketFileInfo):
"""A class that defines the details of S3 files.
:param bucket: A string representing the bucket to which the S3 file belongs.
:param file_path: A string representing the S3 file path.
"""
def __init__(
self,
bucket: Optional[str] = None,
file_path: Optional[str] = None,
*args,
**kwargs
):
super().__init__(bucket=bucket, file_path=file_path, *args, **kwargs)
class Bucket(object, metaclass=ABCMeta):
"""An abstract class of online code repository based on git implementation."""
_bucket_file_info: Optional = None
@abstractmethod
def get_bucket_file_info(self, path: str):
"""Get the detailed information of BUCKET file according to the file URL."""
raise NotImplementedError

42
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/base/git.py

@ -73,19 +73,43 @@ class GitHubFileInfo(GitFileInfo):
) )
# [start Git] class GitLabFileInfo(GitFileInfo):
"""A class that defines the details of GitLab files.
:param host: A string representing the domain name the GitLab file belongs to.
:param user: A string representing the user the GitLab file belongs to.
:param repo_name: A string representing the repository to which the GitLab file belongs.
:param branch: A string representing the branch to which the GitHub file belongs.
:param file_path: A string representing the GitHub file path.
"""
def __init__(
self,
host: Optional[str] = None,
user: Optional[str] = None,
repo_name: Optional[str] = None,
branch: Optional[str] = None,
file_path: Optional[str] = None,
*args,
**kwargs
):
super().__init__(
user=user,
repo_name=repo_name,
branch=branch,
file_path=file_path,
*args,
**kwargs
)
self.host = host
class Git(object, metaclass=ABCMeta): class Git(object, metaclass=ABCMeta):
"""An abstract class of online code warehouse based on git implementation.""" """An abstract class of online code repository based on git implementation."""
_git_file_info: Optional[GitFileInfo] = None _git_file_info: Optional = None
# [start abstractmethod git_file_info]
@abstractmethod @abstractmethod
def get_git_file_info(self, path: str): def get_git_file_info(self, path: str):
"""Get the detailed information of GIT file according to the file URL.""" """Get the detailed information of GIT file according to the file URL."""
raise NotImplementedError raise NotImplementedError
# [end abstractmethod git_file_info]
# [end Git]

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/github.py

@ -22,6 +22,7 @@ from urllib.parse import urljoin
import requests import requests
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.git import Git, GitHubFileInfo from pydolphinscheduler.resources_plugin.base.git import Git, GitHubFileInfo
@ -39,6 +40,8 @@ class GitHub(ResourcePlugin, Git):
super().__init__(prefix, *args, **kwargs) super().__init__(prefix, *args, **kwargs)
self.access_token = access_token self.access_token = access_token
_git_file_info: Optional[GitHubFileInfo] = None
def build_req_api( def build_req_api(
self, self,
user: str, user: str,
@ -54,8 +57,8 @@ class GitHub(ResourcePlugin, Git):
def get_git_file_info(self, path: str): def get_git_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path.""" """Get file information from the file url, like repository name, user, branch, and file path."""
elements = path.split("/") elements = path.split(Symbol.SLASH)
index = self.get_index(path, "/", 7) index = self.get_index(path, Symbol.SLASH, 7)
index = index + 1 index = index + 1
file_info = GitHubFileInfo( file_info = GitHubFileInfo(
user=elements[3], user=elements[3],

112
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/gitlab.py

@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler gitlab resource plugin."""
from typing import Optional
from urllib.parse import urljoin, urlparse
import gitlab
import requests
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.git import Git, GitLabFileInfo
class GitLab(ResourcePlugin, Git):
"""GitLab object, declare GitLab resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of GitLab.
:param private_token: A string used for identity authentication of GitLab private or Internal repository.
:param oauth_token: A string used for identity authentication of GitLab private or Internal repository.
:param username: A string representing the user of the repository.
:param password: A string representing the user password.
"""
def __init__(
self,
prefix: str,
private_token: Optional[str] = None,
oauth_token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
*args,
**kwargs,
):
super().__init__(prefix, *args, **kwargs)
self.private_token = private_token
self.oauth_token = oauth_token
self.username = username
self.password = password
def get_git_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path."""
self.get_index(path, Symbol.SLASH, 8)
result = urlparse(path)
elements = result.path.split(Symbol.SLASH)
self._git_file_info = GitLabFileInfo(
host=f"{result.scheme}://{result.hostname}",
repo_name=elements[2],
branch=elements[5],
file_path=Symbol.SLASH.join(
str(elements[i]) for i in range(6, len(elements))
),
user=elements[1],
)
def authentication(self):
"""Gitlab authentication."""
host = self._git_file_info.host
if self.private_token is not None:
return gitlab.Gitlab(host, private_token=self.private_token)
if self.oauth_token is not None:
return gitlab.Gitlab(host, oauth_token=self.oauth_token)
if self.username is not None and self.password is not None:
oauth_token = self.OAuth_token()
return gitlab.Gitlab(host, oauth_token=oauth_token)
return gitlab.Gitlab(host)
def OAuth_token(self):
"""Obtain OAuth Token."""
data = {
"grant_type": "password",
"username": self.username,
"password": self.password,
}
host = self._git_file_info.host
resp = requests.post("%s/oauth/token" % host, data=data)
oauth_token = resp.json()["access_token"]
return oauth_token
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = urljoin(self.prefix, suf)
self.get_git_file_info(path)
gl = self.authentication()
project = gl.projects.get(
"%s/%s" % (self._git_file_info.user, self._git_file_info.repo_name)
)
return (
project.files.get(
file_path=self._git_file_info.file_path, ref=self._git_file_info.branch
)
.decode()
.decode()
)

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py

@ -28,7 +28,6 @@ class Local(ResourcePlugin):
"""Local object, declare local resource plugin for task and workflow to dolphinscheduler. """Local object, declare local resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of Local. :param prefix: A string representing the prefix of Local.
""" """
# [start init_method] # [start init_method]

76
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/oss.py

@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler oss resource plugin."""
from typing import Optional
from urllib.parse import urljoin, urlparse
import oss2
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.bucket import Bucket, OSSFileInfo
class OSS(ResourcePlugin, Bucket):
"""OSS object, declare OSS resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of OSS.
:param access_key_id: A string representing the ID of AccessKey for AliCloud OSS.
:param access_key_secret: A string representing the secret of AccessKey for AliCloud OSS.
"""
def __init__(
self,
prefix: str,
access_key_id: Optional[str] = None,
access_key_secret: Optional[str] = None,
*args,
**kwargs,
):
super().__init__(prefix, *args, **kwargs)
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
_bucket_file_info: Optional[OSSFileInfo] = None
def get_bucket_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path."""
self.get_index(path, Symbol.SLASH, 3)
result = urlparse(path)
hostname = result.hostname
elements = hostname.split(Symbol.POINT)
self._bucket_file_info = OSSFileInfo(
endpoint=f"{result.scheme}://"
f"{Symbol.POINT.join(str(elements[i]) for i in range(1, len(elements)))}",
bucket=hostname.split(Symbol.POINT)[0],
file_path=result.path[1:],
)
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = urljoin(self.prefix, suf)
self.get_bucket_file_info(path)
auth = oss2.Auth(self.access_key_id, self.access_key_secret)
bucket = oss2.Bucket(
auth, self._bucket_file_info.endpoint, self._bucket_file_info.bucket
)
result = bucket.get_object(self._bucket_file_info.file_path).read().decode()
return result.read().decode()

74
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/s3.py

@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler S3 resource plugin."""
from typing import Optional
from urllib.parse import urljoin
import boto3
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.resources_plugin.base.bucket import Bucket, S3FileInfo
class S3(ResourcePlugin, Bucket):
"""S3 object, declare S3 resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of S3.
:param access_key_id: A string representing the ID of AccessKey for Amazon S3.
:param access_key_secret: A string representing the secret of AccessKey for Amazon S3.
"""
def __init__(
self,
prefix: str,
access_key_id: Optional[str] = None,
access_key_secret: Optional[str] = None,
*args,
**kwargs
):
super().__init__(prefix, *args, **kwargs)
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
_bucket_file_info: Optional[S3FileInfo] = None
def get_bucket_file_info(self, path: str):
"""Get file information from the file url, like repository name, user, branch, and file path."""
elements = path.split(Symbol.SLASH)
self.get_index(path, Symbol.SLASH, 3)
self._bucket_file_info = S3FileInfo(
bucket=elements[2].split(Symbol.POINT)[0],
file_path=Symbol.SLASH.join(
str(elements[i]) for i in range(3, len(elements))
),
)
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = urljoin(self.prefix, suf)
self.get_bucket_file_info(path)
bucket = self._bucket_file_info.bucket
key = self._bucket_file_info.file_path
s3_resource = boto3.resource("s3")
s3_object = s3_resource.Object(bucket, key)
return s3_object.get()["Body"].read().decode("utf-8")

116
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_gitlab.py

@ -0,0 +1,116 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test github resource plugin."""
import pytest
from pydolphinscheduler.resources_plugin.gitlab import GitLab
@pytest.mark.parametrize(
"attr, expected",
[
(
"https://gitlab.com/pydolphinscheduler/ds-gitlab/-/blob/main/union.sh",
{
"branch": "main",
"file_path": "union.sh",
"host": "https://gitlab.com",
"repo_name": "ds-gitlab",
"user": "pydolphinscheduler",
},
),
(
"https://gitlab.com/pydolphinscheduler/ds/-/blob/dev/test/exc.sh",
{
"branch": "dev",
"file_path": "test/exc.sh",
"host": "https://gitlab.com",
"repo_name": "ds",
"user": "pydolphinscheduler",
},
),
],
)
def test_gitlab_get_git_file_info(attr, expected):
"""Test the get_file_info function of the gitlab resource plugin."""
gitlab = GitLab(prefix="prefix")
gitlab.get_git_file_info(attr)
assert expected == gitlab._git_file_info.__dict__
@pytest.mark.skip(reason="This test needs gitlab service")
@pytest.mark.parametrize(
"attr, expected",
[
(
{
"init": {
"prefix": "https://gitlab.com/pydolphinscheduler/ds-internal/-/blob/main",
"oauth_token": "24518bd4cf5bfe9xx",
},
"file_path": "union.sh",
},
"test gitlab resource plugin\n",
),
(
{
"init": {
"prefix": "https://gitlab.com/pydolphinscheduler/ds/-/blob/main",
"private_token": "9TyTe2xx",
},
"file_path": "union.sh",
},
"test gitlab resource plugin\n",
),
(
{
"init": {
"prefix": "https://gitlab.com/pydolphinscheduler/ds-gitlab/-/blob/main",
"username": "pydolphinscheduler",
"password": "4295xx",
},
"file_path": "union.sh",
},
"test gitlab resource plugin\n",
),
(
{
"init": {
"prefix": "https://gitlab.com/pydolphinscheduler/ds-public/-/blob/main",
},
"file_path": "union.sh",
},
"test gitlab resource plugin\n",
),
(
{
"init": {
"prefix": "https://gitlab.com/pydolphinscheduler/ds-internal/-/blob/main",
"username": "pydolphinscheduler",
"password": "429xxx",
},
"file_path": "union.sh",
},
"test gitlab resource plugin\n",
),
],
)
def test_gitlab_read_file(attr, expected):
"""Test the read_file function of the gitlab resource plug-in."""
gitlab = GitLab(**attr.get("init"))
assert expected == gitlab.read_file(attr.get("file_path"))

112
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_oss.py

@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test oss resource plugin."""
import pytest
from pydolphinscheduler.resources_plugin.oss import OSS
@pytest.mark.parametrize(
"attr, expected",
[
(
"https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com/a.sh",
{
"endpoint": "https://oss-cn-hangzhou.aliyuncs.com",
"file_path": "a.sh",
"bucket": "ospp-ds-private",
},
),
(
"https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com/dir/a.sh",
{
"endpoint": "https://oss-cn-hangzhou.aliyuncs.com",
"file_path": "dir/a.sh",
"bucket": "ospp-ds-public",
},
),
],
)
def test_oss_get_bucket_file_info(attr, expected):
"""Test the get_bucket_file_info function of the oss resource plugin."""
oss = OSS(prefix="prefix")
oss.get_bucket_file_info(attr)
assert expected == oss._bucket_file_info.__dict__
@pytest.mark.skip(reason="This test requires OSS services")
@pytest.mark.parametrize(
"attr, expected",
[
(
{
"init": {
"prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com",
"access_key_id": "LTAI5tP25Mxx",
"access_key_secret": "cSur23Qbxx",
},
"file_path": "a.sh",
},
"test oss resource plugin\n",
),
(
{
"init": {
"prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com/dir/",
"access_key_id": "LTAxx",
"access_key_secret": "cSur23Qxx",
},
"file_path": "b.sh",
},
"test oss resource plugin\n",
),
(
{
"init": {
"prefix": "https://ospp-ds-private.oss-cn-hangzhou.aliyuncs.com",
},
"file_path": "b.sh",
},
"test oss resource plugin\n",
),
(
{
"init": {
"prefix": "https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com",
},
"file_path": "b.sh",
},
"test oss resource plugin\n",
),
(
{
"init": {
"prefix": "https://ospp-ds-public.oss-cn-hangzhou.aliyuncs.com/dir/",
"access_key_id": "LTAIxx",
"access_key_secret": "cSurxx",
},
"file_path": "a.sh",
},
"test oss resource plugin\n",
),
],
)
def test_oss_read_file(attr, expected):
"""Test the read_file function of the oss resource plug-in."""
oss = OSS(**attr.get("init"))
assert expected == oss.read_file(attr.get("file_path"))

79
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_s3.py

@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test oss resource plugin."""
import pytest
from pydolphinscheduler.resources_plugin import S3
@pytest.mark.parametrize(
"attr, expected",
[
(
"https://ds-resource-plugin-private.s3.amazonaws.com/a.sh",
{
"file_path": "a.sh",
"bucket": "ds-resource-plugin-private",
},
),
(
"https://ds-resource-plugin-public.s3.amazonaws.com/dir/a.sh",
{
"file_path": "dir/a.sh",
"bucket": "ds-resource-plugin-public",
},
),
],
)
def test_s3_get_bucket_file_info(attr, expected):
"""Test the get_bucket_file_info function of the s3 resource plugin."""
s3 = S3(prefix="prefix")
s3.get_bucket_file_info(attr)
assert expected == s3._bucket_file_info.__dict__
@pytest.mark.skip(reason="This test requires s3 services")
@pytest.mark.parametrize(
"attr, expected",
[
(
{
"init": {
"prefix": "https://ds-resource-plugin-private.s3.amazonaws.com/dir/",
"access_key_id": "LTAI5tP25Mxx",
"access_key_secret": "cSur23Qbxx",
},
"file_path": "a.sh",
},
"test s3 resource plugin\n",
),
(
{
"init": {
"prefix": "https://ds-resource-plugin-public.s3.amazonaws.com/",
},
"file_path": "a.sh",
},
"test s3 resource plugin\n",
),
],
)
def test_s3_read_file(attr, expected):
"""Test the read_file function of the s3 resource plug-in."""
s3 = S3(**attr.get("init"))
assert expected == s3.read_file(attr.get("file_path"))
Loading…
Cancel
Save