Browse Source

[python] Add task decorator for python function (#9496)

* [python] Add task decorator for python function

* Add decorator `@task`
* Add a tutorial about it
* Change tutorial doc and combine into traditional docs
  * Add sphinx-inline-tab for better view

* revert not need change

* Correct python function indent

* Correct integration test
3.0.0/version-upgrade
Jiajie Zhong 2 years ago committed by GitHub
parent
commit
1f48601c75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
  2. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
  3. 9
      dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
  4. 33
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
  5. 1
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
  6. 233
      dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
  7. 1
      dolphinscheduler-python/pydolphinscheduler/setup.py
  8. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
  9. 91
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py
  10. 61
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py
  11. 71
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
  12. 4
      dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
  13. 5
      dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
  14. 169
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py
  15. 40
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
  16. 32
      dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py

2
dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py

@ -57,6 +57,8 @@ extensions = [
"sphinx_rtd_theme", "sphinx_rtd_theme",
# Documenting command line interface # Documenting command line interface
"sphinx_click.ext", "sphinx_click.ext",
# Add inline tabbed content
"sphinx_inline_tabs",
] ]
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.

8
dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst

@ -15,10 +15,14 @@
specific language governing permissions and limitations specific language governing permissions and limitations
under the License. under the License.
How To HOWTOs
====== ======
In this section pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly
completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept`
and :doc:`../tutorial`.
Currently, the HOWTOs are:
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2

9
dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst

@ -137,13 +137,16 @@ from the API server, you should first change pydolphinscheduler configuration an
You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported. You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported.
After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
and the path of web UI is `Project -> Workflow -> Workflow Definition`.
What's More What's More
----------- -----------
If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
and see how it work. But if you already know the inside of *PyDolphinScheduler*, if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports. :doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
.. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download .. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
.. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org .. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org

33
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst

@ -0,0 +1,33 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Python Function Wrapper
=======================
A decorator covert Python function into pydolphinscheduler's task.
Example
-------
.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py
:start-after: [start tutorial]
:end-before: [end tutorial]
Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.func_wrap

1
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst

@ -23,6 +23,7 @@ In this section
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
func_wrap
shell shell
sql sql
python python

233
dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst

@ -18,129 +18,202 @@
Tutorial Tutorial
======== ========
This tutorial show you the basic concept of *PyDolphinScheduler* and tell all This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
things you should know before you submit or run your first workflow. If you things you should know before you submit or run your first workflow. If you
still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
Overview of Tutorial Overview of Tutorial
-------------------- --------------------
Here have an overview of our tutorial, and it look a little complex but do not Here have an overview of our tutorial, and it looks a little complex but does not
worry about that because we explain this example below as detailed as possible. worry about that because we explain this example below as detail as possible.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py There are two types of tutorials: traditional and task decorator.
:start-after: [start tutorial]
:end-before: [end tutorial] - **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
when you build your workflow at the beginning.
- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
versatility to the traditional way because it only supported Python functions and without build-in tasks
supported. But it is helpful if your workflow is all built with Python or if you already have some Python
workflow code and want to migrate them to pydolphinscheduler.
.. tab:: Tradition
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start tutorial]
:end-before: [end tutorial]
.. tab:: Task Decorator
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
:dedent: 0
:start-after: [start tutorial]
:end-before: [end tutorial]
Import Necessary Module Import Necessary Module
----------------------- -----------------------
First of all, we should importing necessary module which we would use later just First of all, we should import the necessary module which we would use later just like other Python packages.
like other Python package. We just create a minimum demo here, so we just import
:class:`pydolphinscheduler.core.process_definition` and
:class:`pydolphinscheduler.tasks.shell`.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py .. tab:: Tradition
:start-after: [start package_import]
:end-before: [end package_import] .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start package_import]
:end-before: [end package_import]
If you want to use other task type you could click and In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
:doc:`see all tasks we support <tasks/index>` :class:`pydolphinscheduler.tasks.shell.Shell`.
If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
.. tab:: Task Decorator
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
:dedent: 0
:start-after: [start package_import]
:end-before: [end package_import]
In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
:func:`pydolphinscheduler.tasks.func_wrap.task`.
Process Definition Declaration Process Definition Declaration
------------------------------ ------------------------------
We should instantiate object after we import them from `import necessary module`_. We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
Here we declare basic arguments for process definition(aka, workflow). We define import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
the name of process definition, using `Python context manager`_ and it We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
**the only required argument** for object process definition. Beside that we also for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
declare three arguments named `schedule`, `start_time` which setting workflow schedule which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
interval and schedule start_time, and argument `tenant` which changing workflow's will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler* *PyDolphinScheduler* :doc:`concept` for more information.
:doc:`concept` page have more detail information.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py .. tab:: Tradition
:start-after: [start workflow_declare]
:end-before: [end workflow_declare] .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
.. tab:: Task Decorator
We could find more detail about process definition in .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
:ref:`concept about process definition <concept:process definition>` if you interested in it. :dedent: 0
For all arguments of object process definition, you could find in the :start-after: [start workflow_declare]
:class:`pydolphinscheduler.core.process_definition` api documentation. :end-before: [end workflow_declare]
We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
if you are interested in it. For all arguments of object process definition, you could find in the
:class:`pydolphinscheduler.core.process_definition` API documentation.
Task Declaration Task Declaration
---------------- ----------------
Here we declare four tasks, and bot of them are simple task of .. tab:: Tradition
:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
Beside the argument `command`, we also need setting argument `name` for each task *(not
only shell task, `name` is required for each type of task)*.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py We declare four tasks to show how to create tasks, and both of them are simple tasks of
:dedent: 0 :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
:start-after: [start task_declare] `command` with :code:`echo` command, we also need to set the argument `name` for each task
:end-before: [end task_declare] *(not only shell task, `name` is required for each type of task)*.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start task_declare]
:end-before: [end task_declare]
Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`.
.. tab:: Task Decorator
Beside shell task, *PyDolphinScheduler* support multiple tasks and you could We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
find in :doc:`tasks/index`. using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
:code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
:dedent: 0
:start-after: [start task_declare]
:end-before: [end task_declare]
It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
Setting Task Dependence Setting Task Dependence
----------------------- -----------------------
After we declare both process definition and task, we have one workflow with After we declare both process definition and task, we have four tasks that are independent and will be running
four tasks, both all tasks is independent so that they would run in parallel. in parallel. If you want to start one task until some task is finished, you have to set dependence on those
We should reorder the sort and the dependence of tasks. It useful when we need tasks.
run prepare task before we run actual task or we need tasks running is specific
rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
operators `>>` and `<<`.
In this example, we set task `task_parent` is the upstream task of task Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by
`task_child_one` and `task_child_two`, and task `task_union` is the downstream bitwise operators :code:`>>` and :code:`<<`
task of both these two task.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and
:dedent: 0 task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one`
:start-after: [start task_relation_declare] and task `task_child_two` was done, because both two task is `task_union`'s upstream.
:end-before: [end task_relation_declare]
.. tab:: Tradition
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start task_relation_declare]
:end-before: [end task_relation_declare]
Please notice that we could grouping some tasks and set dependence if they have .. tab:: Task Decorator
same downstream or upstream. We declare task `task_child_one` and `task_child_two`
as a group here, named as `task_group` and set task `task_parent` as upstream of .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept :dedent: 0
documentation. :start-after: [start task_relation_declare]
:end-before: [end task_relation_declare]
.. note::
We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named
`task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in
:ref:`concept:Tasks Dependence` for more detail about how to set task dependence.
Submit Or Run Workflow Submit Or Run Workflow
---------------------- ----------------------
Now we finish our workflow definition, with task and task dependence, but all After that, we finish our workflow definition, with four tasks and task dependence, but all these things are
these things are in local, we should let Apache DolphinScheduler daemon know what we local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
define our workflow. So the last thing we have to do here is submit our workflow to have to do is submit the workflow to the DolphinScheduler daemon.
Apache DolphinScheduler daemon.
We here in the example using `ProcessDefinition` attribute `run` to submit workflow Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
to the daemon, and set the schedule time we just declare in `process definition declaration`_. will create workflow definition as well as workflow schedule.
Now, we could run the Python code like other Python script, for the basic usage run .. tab:: Tradition
:code:`python tutorial.py` to trigger and run it.
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:dedent: 0
:start-after: [start submit_or_run]
:end-before: [end submit_or_run]
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py .. tab:: Task Decorator
:dedent: 0
:start-after: [start submit_or_run] .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
:end-before: [end submit_or_run] :dedent: 0
:start-after: [start submit_or_run]
:end-before: [end submit_or_run]
At last, we could execute this workflow code in your terminal like other Python scripts, running
:code:`python tutorial.py` to trigger and execute it.
.. note::
If you not start your Apache DolphinScheduler server, you could find the way in If you do not start your DolphinScheduler API server, you could find how to start it in
:ref:`start:start Python gateway service` and it would have more detail about related server :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition` :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
and it just submit workflow to the daemon but not setting the schedule information. For the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
more detail you could see :ref:`concept:process definition`.
DAG Graph After Tutorial Run DAG Graph After Tutorial Run
---------------------------- ----------------------------
After we run the tutorial code, you could login Apache DolphinScheduler web UI, After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
go and see the `DolphinScheduler project page`_. they is a new process definition be `DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:language: text :language: text

1
dolphinscheduler-python/pydolphinscheduler/setup.py

@ -51,6 +51,7 @@ doc = [
"sphinx>=4.3", "sphinx>=4.3",
"sphinx_rtd_theme>=1.0", "sphinx_rtd_theme>=1.0",
"sphinx-click>=3.0", "sphinx-click>=3.0",
"sphinx-inline-tabs",
] ]
test = [ test = [

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py

@ -18,10 +18,12 @@
"""Init pydolphinscheduler.core package.""" """Init pydolphinscheduler.core package."""
from pydolphinscheduler.core.database import Database from pydolphinscheduler.core.database import Database
from pydolphinscheduler.core.engine import Engine
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task from pydolphinscheduler.core.task import Task
__all__ = [ __all__ = [
"Engine",
"ProcessDefinition", "ProcessDefinition",
"Task", "Task",
"Database", "Database",

91
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py

@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
r"""
A tutorial example take you to experience pydolphinscheduler.
After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
and workflow DAG graph as below:
--> task_child_one
/ \
task_parent --> --> task_union
\ /
--> task_child_two
it will instantiate and run all the task it have.
"""
# [start tutorial]
# [start package_import]
# Import ProcessDefinition object to define your workflow attributes
from pydolphinscheduler.core.process_definition import ProcessDefinition
# Import task Shell object cause we would create some shell tasks later
from pydolphinscheduler.tasks.func_wrap import task
# [end package_import]
# [start task_declare]
@task
def task_parent():
"""First task in this workflow."""
print("echo hello pydolphinscheduler")
@task
def task_child_one():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child one'")
@task
def task_child_two():
"""Child task will be run parallel after task ``task_parent`` finished."""
print("echo 'child two'")
@task
def task_union():
"""Last task in this workflow."""
print("echo union")
# [end task_declare]
# [start workflow_declare]
with ProcessDefinition(
name="tutorial_decorator",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
# [end workflow_declare]
# [start task_relation_declare]
task_group = [task_child_one(), task_child_two()]
task_parent().set_downstream(task_group)
task_union() << task_group
# [end task_relation_declare]
# [start submit_or_run]
pd.run()
# [end submit_or_run]
# [end tutorial]

61
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py

@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task function wrapper allows using decorator to create a task."""
import functools
import inspect
import itertools
import types
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python
def _get_func_str(func: types.FunctionType) -> str:
"""Get Python function string without indent from decorator.
:param func: The function which wraps by decorator ``@task``.
"""
lines = inspect.getsourcelines(func)[0]
src_strip = ""
lead_space_num = None
for line in lines:
if lead_space_num is None:
lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line))
if line.strip() == "@task":
continue
elif line.strip().startswith("@"):
raise PyDSParamException(
"Do no support other decorators for function ``task`` decorator."
)
src_strip += line[lead_space_num:]
return src_strip
def task(func: types.FunctionType):
"""Decorate which covert Python function into pydolphinscheduler task."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_str = _get_func_str(func)
return Python(
name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs
)
return wrapper

71
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py

@ -18,34 +18,85 @@
"""Task Python.""" """Task Python."""
import inspect import inspect
import logging
import re
import types import types
from typing import Any from typing import Union
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
log = logging.getLogger(__file__)
class Python(Task): class Python(Task):
"""Task Python object, declare behavior for Python task to dolphinscheduler.""" """Task Python object, declare behavior for Python task to dolphinscheduler.
Python task support two types of parameters for :param:``code``, and here is an example:
Using str type of :param:``code``
.. code-block:: python
python_task = Python(name="str_type", code="print('Hello Python task.')")
Or using Python callable type of :param:``code``
.. code-block:: python
def foo():
print("Hello Python task.")
python_task = Python(name="str_type", code=foo)
:param name: The name for Python task. It define the task name.
:param definition: String format of Python script you want to execute or Python callable you
want to execute.
"""
_task_custom_attr = { _task_custom_attr = {
"raw_script", "raw_script",
} }
def __init__(self, name: str, code: Any, *args, **kwargs): def __init__(
self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
):
super().__init__(name, TaskType.PYTHON, *args, **kwargs) super().__init__(name, TaskType.PYTHON, *args, **kwargs)
self._code = code self.definition = definition
def _build_exe_str(self) -> str:
"""Build executable string from given definition.
Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
"""
if isinstance(self.definition, types.FunctionType):
py_function = inspect.getsource(self.definition)
func_str = f"{py_function}{self.definition.__name__}()"
else:
pattern = re.compile("^def (\\w+)\\(")
find = pattern.findall(self.definition)
if not find:
log.warning(
"Python definition is simple script instead of function, with value %s",
self.definition,
)
return self.definition
# Keep function str and function callable always have one blank line
func_str = (
f"{self.definition}{find[0]}()"
if self.definition.endswith("\n")
else f"{self.definition}\n{find[0]}()"
)
return func_str
@property @property
def raw_script(self) -> str: def raw_script(self) -> str:
"""Get python task define attribute `raw_script`.""" """Get python task define attribute `raw_script`."""
if isinstance(self._code, str): if isinstance(self.definition, (str, types.FunctionType)):
return self._code return self._build_exe_str()
elif isinstance(self._code, types.FunctionType):
py_function = inspect.getsource(self._code)
return py_function
else: else:
raise PyDSParamException( raise PyDSParamException(
"Parameter code do not support % for now.", type(self._code) "Parameter definition do not support % for now.", type(self.definition)
) )

4
dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py

@ -44,7 +44,7 @@ def test_task_without_example():
Avoiding add new type of tasks but without adding example describe how to use it. Avoiding add new type of tasks but without adding example describe how to use it.
""" """
# We use example/tutorial.py as shell task example # We use example/tutorial.py as shell task example
ignore_name = {"__init__.py", "shell.py"} ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)} all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
have_example_tasks = set() have_example_tasks = set()
@ -97,7 +97,7 @@ def test_example_basic():
), f"We expect all examples is python script, but get {ex.name}." ), f"We expect all examples is python script, but get {ex.name}."
# All except tutorial and __init__ is end with keyword "_example" # All except tutorial and __init__ is end with keyword "_example"
if ex.stem != "tutorial" and ex.stem != "__init__": if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__":
assert ex.stem.endswith( assert ex.stem.endswith(
"_example" "_example"
), f"We expect all examples script end with keyword '_example', but get {ex.stem}." ), f"We expect all examples script end with keyword '_example', but get {ex.stem}."

5
dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py

@ -18,6 +18,7 @@
"""Test whether success submit examples DAG to PythonGatewayService.""" """Test whether success submit examples DAG to PythonGatewayService."""
from pathlib import Path from pathlib import Path
from subprocess import Popen
import pytest import pytest
@ -36,6 +37,8 @@ from tests.testing.path import path_example
def test_exec_white_list_example(example_path: Path): def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayService.""" """Test execute examples and submit DAG to PythonGatewayService."""
try: try:
exec(example_path.read_text()) # Because our task decorator used module ``inspect`` to get the source, and it will
# raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
Popen(["python", str(example_path)])
except Exception: except Exception:
raise Exception("Run example %s failed.", example_path.stem) raise Exception("Run example %s failed.", example_path.stem)

169
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py

@ -0,0 +1,169 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test module about function wrap task decorator."""
from unittest.mock import patch
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.func_wrap import task
from tests.testing.decorator import foo as foo_decorator
from tests.testing.task import Task
PD_NAME = "test_process_definition"
TASK_NAME = "test_task"
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
)
def test_single_task_outside(mock_code):
"""Test single decorator task which outside process definition."""
@task
def foo():
print(TASK_NAME)
with ProcessDefinition(PD_NAME) as pd:
foo()
assert pd is not None and pd.name == PD_NAME
assert len(pd.tasks) == 1
pd_task = pd.tasks[12345]
assert pd_task.name == "foo"
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
)
def test_single_task_inside(mock_code):
"""Test single decorator task which inside process definition."""
with ProcessDefinition(PD_NAME) as pd:
@task
def foo():
print(TASK_NAME)
foo()
assert pd is not None and pd.name == PD_NAME
assert len(pd.tasks) == 1
pd_task = pd.tasks[12345]
assert pd_task.name == "foo"
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
)
def test_addition_decorator_error(mock_code):
"""Test error when using task decorator to a function already have decorator."""
@task
@foo_decorator
def foo():
print(TASK_NAME)
with ProcessDefinition(PD_NAME) as pd: # noqa: F841
with pytest.raises(
PyDSParamException, match="Do no support other decorators for.*"
):
foo()
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
)
def test_multiple_tasks_outside(mock_code):
"""Test multiple decorator tasks which outside process definition."""
@task
def foo():
print(TASK_NAME)
@task
def bar():
print(TASK_NAME)
with ProcessDefinition(PD_NAME) as pd:
foo = foo()
bar = bar()
foo >> bar
assert pd is not None and pd.name == PD_NAME
assert len(pd.tasks) == 2
task_foo = pd.get_one_task_by_name("foo")
task_bar = pd.get_one_task_by_name("bar")
assert set(pd.task_list) == {task_foo, task_bar}
assert (
task_foo is not None
and task_foo._upstream_task_codes == set()
and task_foo._downstream_task_codes.pop() == task_bar.code
)
assert (
task_bar is not None
and task_bar._upstream_task_codes.pop() == task_foo.code
and task_bar._downstream_task_codes == set()
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
)
def test_multiple_tasks_inside(mock_code):
"""Test multiple decorator tasks which inside process definition."""
with ProcessDefinition(PD_NAME) as pd:
@task
def foo():
print(TASK_NAME)
@task
def bar():
print(TASK_NAME)
foo = foo()
bar = bar()
foo >> bar
assert pd is not None and pd.name == PD_NAME
assert len(pd.tasks) == 2
task_foo = pd.get_one_task_by_name("foo")
task_bar = pd.get_one_task_by_name("bar")
assert set(pd.task_list) == {task_foo, task_bar}
assert (
task_foo is not None
and task_foo._upstream_task_codes == set()
and task_foo._downstream_task_codes.pop() == task_bar.code
)
assert (
task_bar is not None
and task_bar._upstream_task_codes.pop() == task_foo.code
and task_bar._downstream_task_codes == set()
)

40
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python from pydolphinscheduler.tasks.python import Python
def foo(): # noqa: D103
print("hello world.")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"attr, expect", "attr, expect",
[ [
( (
{"code": "print(1)"}, {"definition": "print(1)"},
{ {
"rawScript": "print(1)", "rawScript": "print(1)",
"localParams": [], "localParams": [],
@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python
"waitStartTimeout": {}, "waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]}, "conditionResult": {"successNode": [""], "failedNode": [""]},
}, },
) ),
(
{"definition": "def foo():\n print('I am foo')"},
{
"rawScript": "def foo():\n print('I am foo')\nfoo()",
"localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
),
(
{"definition": foo},
{
"rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
"localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
),
], ],
) )
@patch( @patch(
@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect):
def test_python_task_not_support_code(mock_code, script_code): def test_python_task_not_support_code(mock_code, script_code):
"""Test python task parameters.""" """Test python task parameters."""
name = "not_support_code_type" name = "not_support_code_type"
with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"): with pytest.raises(
PyDSParamException, match="Parameter definition do not support .*?"
):
task = Python(name, script_code) task = Python(name, script_code)
task.raw_script task.raw_script
def foo(): # noqa: D103
print("hello world.")
@pytest.mark.parametrize( @pytest.mark.parametrize(
"name, script_code, raw", "name, script_code, raw",
[ [
@ -82,7 +106,7 @@ def foo(): # noqa: D103
( (
"function_define", "function_define",
foo, foo,
'def foo(): # noqa: D103\n print("hello world.")\n', 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
), ),
], ],
) )

32
dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py

@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Decorator module for testing module."""
import types
from functools import wraps
def foo(func: types.FunctionType):
"""Decorate which do nothing for testing module."""
@wraps(func)
def wrapper():
print("foo decorator called.")
func()
return wrapper
Loading…
Cancel
Save