Browse Source

[DSIP-13][python] New mechanism file plugins to Python API (#11360)

3.2.0-release
chenrj 2 years ago committed by GitHub
parent
commit
6d460a3ca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
  2. 46
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst
  3. 28
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst
  4. 32
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst
  5. 75
      dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst
  6. 3
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  7. 49
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py
  8. 47
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  9. 64
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
  10. 4
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
  11. 23
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py
  12. 57
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py
  13. 5
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
  14. 116
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  15. 6
      dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
  16. 18
      dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py
  17. 108
      dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py
  18. 43
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

1
dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst

@ -36,6 +36,7 @@ then go and see :doc:`tutorial` for more detail.
cli cli
config config
api api
resources_plugin/index
Indices and tables Indices and tables
================== ==================

46
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/develop.rst

@ -0,0 +1,46 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
How to develop
==============
When you want to create a new resource plugin, you need to add a new class in the module `resources_plugin`.
The resource plug-in class needs to inherit the abstract class `ResourcePlugin` and implement its abstract method `read_file` function.
The parameter of the `__init__` function of `ResourcePlugin` is the prefix of STR type. You can override this function when necessary.
The `read_file` function parameter of `ResourcePlugin` is the file suffix of STR type, and its return value is the file content, if it exists and is readable.
Example
-------
- Method `__init__`: Initiation method with `param`:`prefix`
.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
:start-after: [start init_method]
:end-before: [end init_method]
- Method `read_file`: Get content from the given URI, The function parameter is the suffix of the file path.
The file prefix has been initialized in init of the resource plug-in.
The prefix plus suffix is the absolute path of the file in this resource.
.. literalinclude:: ../../../src/pydolphinscheduler/resources_plugin/local.py
:start-after: [start read_file_method]
:end-before: [end read_file_method]

28
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/index.rst

@ -0,0 +1,28 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Resources_plugin
================
In this section
.. toctree::
:maxdepth: 1
develop
resource-plugin
local

32
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/local.rst

@ -0,0 +1,32 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Local
=====
`Local` is a local resource plugin for pydolphinscheduler.
When using a local resource plugin, you only need to add the `resource_plugin` parameter in the task subclass or workflow definition,
such as `resource_plugin=Local("/tmp")`.
For the specific use of resource plugins, you can see `How to use` in :doc:`./resource-plugin`
Dive Into
---------
.. automodule:: pydolphinscheduler.resources_plugin.local

75
dolphinscheduler-python/pydolphinscheduler/docs/source/resources_plugin/resource-plugin.rst

@ -0,0 +1,75 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
ResourcePlugin
==============
`ResourcePlugin` is an abstract class of resource plug-in parameters of task subclass and workflow.
All resource plugins need to inherit and override its abstract methods.
Code
----
.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start resource_plugin_definition]
:end-before: [end resource_plugin_definition]
Dive Into
---------
It has the following key functions.
- Method `__init__`: The `__init__` function has STR type parameter `prefix`, which means the prefix of the resource.
You can rewrite this function if necessary.
.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start init_method]
:end-before: [end init_method]
- Method `read_file`: Get content from the given URI, The function parameter is the suffix of the file path.
The file prefix has been initialized in init of the resource plug-in.
The prefix plus suffix is the absolute path of the file in this resource.
It is an abstract function. You must rewrite it
.. literalinclude:: ../../../src/pydolphinscheduler/core/resource_plugin.py
:start-after: [start abstractmethod read_file]
:end-before: [end abstractmethod read_file]
.. automodule:: pydolphinscheduler.core.resource_plugin
How to use
----------
Resource plug-ins can be used in task subclasses and workflows. You can use the resource plug-ins by adding the `resource_plugin` parameter when they are initialized.
For example, local resource plug-ins, add `resource_plugin = Local("/tmp")`.
The resource plug-ins we currently support is `local`.
Here is an example.
.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_resource_plugin.py
:start-after: [start workflow_declare]
:end-before: [end task_declare]
When the resource_plugin parameter is defined in both the task subclass and the workflow, the resource_plugin defined in the task subclass is used first.
If the task subclass does not define resource_plugin, but the resource_plugin is defined in the workflow, the resource_plugin in the workflow is used.
Of course, if neither the task subclass nor the workflow specifies resource_plugin, the command at this time will be executed as a script,
in other words, we are forward compatible.

3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler import configuration from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import JavaGate from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base, Project, Tenant, User from pydolphinscheduler.models import Base, Project, Tenant, User
@ -111,6 +112,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0, timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE, release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None, param: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
resource_list: Optional[List[Resource]] = None, resource_list: Optional[List[Resource]] = None,
): ):
super().__init__(name, description) super().__init__(name, description)
@ -134,6 +136,7 @@ class ProcessDefinition(Base):
self._release_state = release_state self._release_state = release_state
self.param = param self.param = param
self.tasks: dict = {} self.tasks: dict = {}
self.resource_plugin = resource_plugin
# TODO how to fix circle import # TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821 self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None self._process_definition_code = None

49
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource_plugin.py

@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler ResourcePlugin object."""
from abc import ABCMeta, abstractmethod
# [start resource_plugin_definition]
class ResourcePlugin(object, metaclass=ABCMeta):
"""ResourcePlugin object, declare resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of ResourcePlugin.
"""
# [start init_method]
def __init__(self, prefix: str, *args, **kwargs):
self.prefix = prefix
# [end init_method]
# [start abstractmethod read_file]
@abstractmethod
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
# [end abstractmethod read_file]
# [end resource_plugin_definition]

47
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -33,7 +33,8 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinitionContext, ProcessDefinitionContext,
) )
from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
from pydolphinscheduler.java_gateway import JavaGate from pydolphinscheduler.java_gateway import JavaGate
from pydolphinscheduler.models import Base from pydolphinscheduler.models import Base
@ -101,6 +102,9 @@ class Task(Base):
_task_custom_attr: set = set() _task_custom_attr: set = set()
ext: set = None
ext_attr: str = None
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
def __init__( def __init__(
@ -124,6 +128,7 @@ class Task(Base):
dependence: Optional[Dict] = None, dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None, wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None, condition_result: Optional[Dict] = None,
resource_plugin: Optional[ResourcePlugin] = None,
): ):
super().__init__(name, description) super().__init__(name, description)
@ -166,6 +171,8 @@ class Task(Base):
self.dependence = dependence or {} self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {} self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
self.resource_plugin = resource_plugin
self.get_content()
@property @property
def process_definition(self) -> Optional[ProcessDefinition]: def process_definition(self) -> Optional[ProcessDefinition]:
@ -229,6 +236,44 @@ class Task(Base):
custom_attr |= self._task_custom_attr custom_attr |= self._task_custom_attr
return self.get_define_custom(custom_attr=custom_attr) return self.get_define_custom(custom_attr=custom_attr)
def get_plugin(self):
"""Return the resource plug-in.
according to parameter resource_plugin and parameter
process_definition.resource_plugin.
"""
if self.resource_plugin is None:
if self.process_definition.resource_plugin is not None:
return self.process_definition.resource_plugin
else:
raise PyResPluginException(
"The execution command of this task is a file, but the resource plugin is empty"
)
else:
return self.resource_plugin
def get_content(self):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return
_ext_attr = getattr(self, self.ext_attr)
if _ext_attr is not None:
if _ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
setattr(self, self.ext_attr.lstrip("_"), content)
else:
index = _ext_attr.rfind(".")
if index != -1:
raise ValueError(
"This task does not support files with suffix {}, only supports {}".format(
_ext_attr[index:], ",".join(str(suf) for suf in self.ext)
)
)
setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
def __hash__(self): def __hash__(self):
return hash(self.code) return hash(self.code)

64
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_resource_plugin.py

@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
r"""
A tutorial example take you to experience pydolphinscheduler resource plugin.
Resource plug-ins can be defined in workflows and tasks
it will instantiate and run all the task it have.
"""
import os
from pathlib import Path
# [start tutorial_resource_plugin]
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.resources_plugin.local import Local
from pydolphinscheduler.tasks.shell import Shell
# [end package_import]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial_resource_plugin",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
resource_plugin=Local("/tmp"),
) as process_definition:
# [end workflow_declare]
# [start task_declare]
file = "resource.sh"
path = Path("/tmp").joinpath(file)
with open(str(path), "w") as f:
f.write("echo tutorial resource plugin")
task_parent = Shell(
name="local-resource-example",
command=file,
)
print(task_parent.task_params)
os.remove(path)
# [end task_declare]
# [start submit_or_run]
process_definition.run()
# [end submit_or_run]
# [end tutorial_resource_plugin]

4
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py

@ -40,3 +40,7 @@ class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
class PyDSConfException(PyDSBaseException): class PyDSConfException(PyDSBaseException):
"""Exception for pydolphinscheduler configuration error.""" """Exception for pydolphinscheduler configuration error."""
class PyResPluginException(PyDSBaseException):
"""Exception for pydolphinscheduler resource plugin error."""

23
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/__init__.py

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Init resources_plugin package."""
from pydolphinscheduler.resources_plugin.local import Local
__all__ = [
"Local",
]

57
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/resources_plugin/local.py

@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""DolphinScheduler local resource plugin."""
import os
from pathlib import Path
from pydolphinscheduler.core.resource_plugin import ResourcePlugin
from pydolphinscheduler.exceptions import PyResPluginException
class Local(ResourcePlugin):
"""Local object, declare local resource plugin for task and workflow to dolphinscheduler.
:param prefix: A string representing the prefix of Local.
"""
# [start init_method]
def __init__(self, prefix: str, *args, **kwargs):
super().__init__(prefix, *args, **kwargs)
# [end init_method]
# [start read_file_method]
def read_file(self, suf: str):
"""Get the content of the file.
The address of the file is the prefix of the resource plugin plus the parameter suf.
"""
path = Path(self.prefix).joinpath(suf)
if not path.exists():
raise PyResPluginException("{} is not found".format(str(path)))
if not os.access(str(path), os.R_OK):
raise PyResPluginException(
"You don't have permission to access {}".format(self.prefix + suf)
)
with open(path, "r") as f:
content = f.read()
return content
# [end read_file_method]

5
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py

@ -50,6 +50,9 @@ class Shell(Task):
"raw_script", "raw_script",
} }
ext: set = {".sh", ".zsh"}
ext_attr: str = "_raw_script"
def __init__(self, name: str, command: str, *args, **kwargs): def __init__(self, name: str, command: str, *args, **kwargs):
self._raw_script = command
super().__init__(name, TaskType.SHELL, *args, **kwargs) super().__init__(name, TaskType.SHELL, *args, **kwargs)
self.raw_script = command

116
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -18,12 +18,14 @@
"""Test Task class function.""" """Test Task class function."""
import logging import logging
import re import re
from unittest.mock import patch from unittest.mock import PropertyMock, patch
import pytest import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation from pydolphinscheduler.core.task import Task, TaskRelation
from pydolphinscheduler.exceptions import PyResPluginException
from pydolphinscheduler.resources_plugin import Local
from tests.testing.task import Task as testTask from tests.testing.task import Task as testTask
from tests.testing.task import TaskWithCode from tests.testing.task import TaskWithCode
@ -252,6 +254,118 @@ def test_add_duplicate(caplog):
) )
@pytest.mark.parametrize(
"val, expected",
[
("a.sh", "echo Test task attribute ext_attr"),
("a.zsh", "echo Test task attribute ext_attr"),
("echo Test task attribute ext_attr", "echo Test task attribute ext_attr"),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.task.Task.ext",
new_callable=PropertyMock,
return_value={".sh", ".zsh"},
)
@patch(
"pydolphinscheduler.core.task.Task.ext_attr",
new_callable=PropertyMock,
return_value="_raw_script",
)
@patch(
"pydolphinscheduler.core.task.Task._raw_script",
create=True,
new_callable=PropertyMock,
)
@patch("pydolphinscheduler.core.task.Task.get_plugin")
def test_task_ext_attr(
m_plugin, m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected
):
"""Test task attribute ext_attr."""
m_plugin.return_value.read_file.return_value = expected
m_raw_script.return_value = val
task = Task("test_task_ext_attr", "test_task_ext_attr")
assert expected == getattr(task, "raw_script")
@pytest.mark.parametrize(
"attr, expected",
[
(
{
"name": "test_task_abtain_res_plugin",
"task_type": "TaskType",
"resource_plugin": Local("prefix"),
"process_definition": ProcessDefinition(
name="process_definition",
resource_plugin=Local("prefix"),
),
},
"Local",
),
(
{
"name": "test_task_abtain_res_plugin",
"task_type": "TaskType",
"resource_plugin": Local("prefix"),
},
"Local",
),
(
{
"name": "test_task_abtain_res_plugin",
"task_type": "TaskType",
"process_definition": ProcessDefinition(
name="process_definition",
resource_plugin=Local("prefix"),
),
},
"Local",
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch("pydolphinscheduler.core.task.Task.get_content")
def test_task_obtain_res_plugin(m_get_content, m_code_version, attr, expected):
"""Test task obtaining resource plug-in."""
task = Task(**attr)
assert expected == task.get_plugin().__class__.__name__
@pytest.mark.parametrize(
"attr",
[
{
"name": "test_task_abtain_res_plugin",
"task_type": "TaskType",
"process_definition": ProcessDefinition(
name="process_definition",
),
},
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch("pydolphinscheduler.core.task.Task.get_content")
def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr):
"""Test task obtaining resource plug-in exception."""
with pytest.raises(
PyResPluginException,
match="The execution command of this task is a file, but the resource plugin is empty",
):
task = Task(**attr)
task.get_plugin()
@pytest.mark.parametrize( @pytest.mark.parametrize(
"resources, expect", "resources, expect",
[ [

6
dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py

@ -97,7 +97,11 @@ def test_example_basic():
), f"We expect all examples is python script, but get {ex.name}." ), f"We expect all examples is python script, but get {ex.name}."
# All except tutorial and __init__ is end with keyword "_example" # All except tutorial and __init__ is end with keyword "_example"
if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__": if (
ex.stem
not in ("tutorial", "tutorial_decorator", "tutorial_resource_plugin")
and ex.stem != "__init__"
):
assert ex.stem.endswith( assert ex.stem.endswith(
"_example" "_example"
), f"We expect all examples script end with keyword '_example', but get {ex.stem}." ), f"We expect all examples script end with keyword '_example', but get {ex.stem}."

18
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/__init__.py

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Init resources_plugin package tests."""

108
dolphinscheduler-python/pydolphinscheduler/tests/resources_plugin/test_local.py

@ -0,0 +1,108 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test local resource plugin."""
from pathlib import Path
from unittest.mock import PropertyMock, patch
import pytest
from pydolphinscheduler.core import Task
from pydolphinscheduler.exceptions import PyResPluginException
from pydolphinscheduler.resources_plugin.local import Local
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file
file_name = "local_res.sh"
file_content = "echo Test local res plugin"
res_plugin_prefix = Path(__file__).parent
file_path = res_plugin_prefix.joinpath(file_name)
@pytest.fixture()
def setup_crt_first():
"""Set up and teardown about create file first and then delete it."""
file.write(content=file_content, to_path=file_path)
yield
delete_file(file_path)
@pytest.mark.parametrize(
"val, expected",
[
(file_name, file_content),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.task.Task.ext",
new_callable=PropertyMock,
return_value={
".sh",
},
)
@patch(
"pydolphinscheduler.core.task.Task.ext_attr",
new_callable=PropertyMock,
return_value="_raw_script",
)
@patch(
"pydolphinscheduler.core.task.Task._raw_script",
create=True,
new_callable=PropertyMock,
)
def test_task_obtain_res_plugin(
m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected, setup_crt_first
):
"""Test task obtaining resource plug-in."""
m_raw_script.return_value = val
task = Task(
name="test_task_ext_attr",
task_type="type",
resource_plugin=Local(str(res_plugin_prefix)),
)
assert expected == getattr(task, "raw_script")
@pytest.mark.parametrize(
"attr, expected",
[({"prefix": res_plugin_prefix, "file_name": file_name}, file_content)],
)
def test_local_res_read_file(attr, expected, setup_crt_first):
"""Test the read_file function of the local resource plug-in."""
local = Local(str(attr.get("prefix")))
local.read_file(attr.get("file_name"))
assert expected == local.read_file(file_name)
@pytest.mark.parametrize(
"attr",
[
{"prefix": res_plugin_prefix, "file_name": file_name},
],
)
def test_local_res_file_not_found(attr):
"""Test local resource plugin file does not exist."""
with pytest.raises(
PyResPluginException,
match=".* is not found",
):
local = Local(str(attr.get("prefix")))
local.read_file(attr.get("file_name"))

43
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

@ -17,12 +17,28 @@
"""Test Task shell.""" """Test Task shell."""
from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file
file_name = "local_res.sh"
file_content = 'echo "test res_local"'
res_plugin_prefix = Path(__file__).parent
file_path = res_plugin_prefix.joinpath(file_name)
@pytest.fixture
def setup_crt_first():
"""Set up and teardown about create file first and then delete it."""
file.write(content=file_content, to_path=file_path)
yield
delete_file(file_path)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -90,3 +106,28 @@ def test_shell_get_define():
shell = Shell(name, command) shell = Shell(name, command)
print(shell.get_define()) print(shell.get_define())
assert shell.get_define() == expect assert shell.get_define() == expect
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "test-local-res-command-content",
"command": file_name,
"resource_plugin": Local(str(res_plugin_prefix)),
},
file_content,
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_shell_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task shell task command content through the local resource plug-in."""
task = Shell(**attr)
assert expect == getattr(task, "raw_script")

Loading…
Cancel
Save