From 1f48601c759df8dae82e966bed1a346416c7449a Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Fri, 15 Apr 2022 15:50:52 +0800 Subject: [PATCH] [python] Add task decorator for python function (#9496) * [python] Add task decorator for python function * Add decorator `@task` * Add a tutorial about it * Change tutorial doc and combine into traditional docs * Add sphinx-inline-tab for better view * revert not need change * Correct python function indent * Correct integration test --- .../pydolphinscheduler/docs/source/conf.py | 2 + .../docs/source/howto/index.rst | 8 +- .../pydolphinscheduler/docs/source/start.rst | 9 +- .../docs/source/tasks/func_wrap.rst | 33 +++ .../docs/source/tasks/index.rst | 1 + .../docs/source/tutorial.rst | 233 ++++++++++++------ .../pydolphinscheduler/setup.py | 1 + .../src/pydolphinscheduler/core/__init__.py | 2 + .../examples/tutorial_decorator.py | 91 +++++++ .../src/pydolphinscheduler/tasks/func_wrap.py | 61 +++++ .../src/pydolphinscheduler/tasks/python.py | 71 +++++- .../tests/example/test_example.py | 4 +- .../tests/integration/test_submit_examples.py | 5 +- .../tests/tasks/test_func_wrap.py | 169 +++++++++++++ .../tests/tasks/test_python.py | 40 ++- .../tests/testing/decorator.py | 32 +++ 16 files changed, 656 insertions(+), 106 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py index e22b3bb1b8..efede5c299 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py @@ -57,6 +57,8 @@ extensions = [ "sphinx_rtd_theme", # Documenting command line interface "sphinx_click.ext", + # Add inline tabbed content + "sphinx_inline_tabs", ] # Add any paths that contain templates here, relative to this directory. diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst index e83b1631cf..a0b3c29c0c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst @@ -15,10 +15,14 @@ specific language governing permissions and limitations under the License. -How To +HOWTOs ====== -In this section +pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly +completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept` +and :doc:`../tutorial`. + +Currently, the HOWTOs are: .. toctree:: :maxdepth: 2 diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst index e411d7bf72..6663c085e9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst @@ -137,13 +137,16 @@ from the API server, you should first change pydolphinscheduler configuration an You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported. +After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler, +and the path of web UI is `Project -> Workflow -> Workflow Definition`. + What's More ----------- -If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` -and see how it work. But if you already know the inside of *PyDolphinScheduler*, -maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports. +If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But +if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all +:doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases. .. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download .. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst new file mode 100644 index 0000000000..5f41b80cfd --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst @@ -0,0 +1,33 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Python Function Wrapper +======================= + +A decorator covert Python function into pydolphinscheduler's task. + +Example +------- + +.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py + :start-after: [start tutorial] + :end-before: [end tutorial] + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.tasks.func_wrap \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst index 42dcdf9c8c..d6bbb960c1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst @@ -23,6 +23,7 @@ In this section .. toctree:: :maxdepth: 1 + func_wrap shell sql python diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst index e0f22fb816..6366c803bb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst @@ -18,129 +18,202 @@ Tutorial ======== -This tutorial show you the basic concept of *PyDolphinScheduler* and tell all +This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all things you should know before you submit or run your first workflow. If you -still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you -could go and see :ref:`how to getting start PyDolphinScheduler ` +still have not installed *PyDolphinScheduler* and start DolphinScheduler, you +could go and see :ref:`how to getting start PyDolphinScheduler ` firstly. Overview of Tutorial -------------------- -Here have an overview of our tutorial, and it look a little complex but do not -worry about that because we explain this example below as detailed as possible. +Here have an overview of our tutorial, and it looks a little complex but does not +worry about that because we explain this example below as detail as possible. -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :start-after: [start tutorial] - :end-before: [end tutorial] +There are two types of tutorials: traditional and task decorator. + +- **Traditional Way**: More general, support many :doc:`built-in task types `, it is convenient + when you build your workflow at the beginning. +- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less + versatility to the traditional way because it only supported Python functions and without build-in tasks + supported. But it is helpful if your workflow is all built with Python or if you already have some Python + workflow code and want to migrate them to pydolphinscheduler. + +.. tab:: Tradition + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start tutorial] + :end-before: [end tutorial] + +.. tab:: Task Decorator + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start tutorial] + :end-before: [end tutorial] Import Necessary Module ----------------------- -First of all, we should importing necessary module which we would use later just -like other Python package. We just create a minimum demo here, so we just import -:class:`pydolphinscheduler.core.process_definition` and -:class:`pydolphinscheduler.tasks.shell`. +First of all, we should import the necessary module which we would use later just like other Python packages. -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :start-after: [start package_import] - :end-before: [end package_import] +.. tab:: Tradition + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start package_import] + :end-before: [end package_import] -If you want to use other task type you could click and -:doc:`see all tasks we support ` + In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and + :class:`pydolphinscheduler.tasks.shell.Shell`. + + If you want to use other task type you could click and :doc:`see all tasks we support ` + +.. tab:: Task Decorator + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start package_import] + :end-before: [end package_import] + + In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and + :func:`pydolphinscheduler.tasks.func_wrap.task`. Process Definition Declaration ------------------------------ -We should instantiate object after we import them from `import necessary module`_. -Here we declare basic arguments for process definition(aka, workflow). We define -the name of process definition, using `Python context manager`_ and it -**the only required argument** for object process definition. Beside that we also -declare three arguments named `schedule`, `start_time` which setting workflow schedule -interval and schedule start_time, and argument `tenant` which changing workflow's -task running user in the worker, :ref:`section tenant ` in *PyDolphinScheduler* -:doc:`concept` page have more detail information. +We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we +import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow). +We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument** +for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time` +which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant +will be running this task in the DolphinScheduler worker. See :ref:`section tenant ` in +*PyDolphinScheduler* :doc:`concept` for more information. -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :start-after: [start workflow_declare] - :end-before: [end workflow_declare] +.. tab:: Tradition + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start workflow_declare] + :end-before: [end workflow_declare] + +.. tab:: Task Decorator -We could find more detail about process definition in -:ref:`concept about process definition ` if you interested in it. -For all arguments of object process definition, you could find in the -:class:`pydolphinscheduler.core.process_definition` api documentation. + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start workflow_declare] + :end-before: [end workflow_declare] + +We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition ` +if you are interested in it. For all arguments of object process definition, you could find in the +:class:`pydolphinscheduler.core.process_definition` API documentation. Task Declaration ---------------- -Here we declare four tasks, and bot of them are simple task of -:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal. -Beside the argument `command`, we also need setting argument `name` for each task *(not -only shell task, `name` is required for each type of task)*. +.. tab:: Tradition -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :dedent: 0 - :start-after: [start task_declare] - :end-before: [end task_declare] + We declare four tasks to show how to create tasks, and both of them are simple tasks of + :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument + `command` with :code:`echo` command, we also need to set the argument `name` for each task + *(not only shell task, `name` is required for each type of task)*. + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start task_declare] + :end-before: [end task_declare] + + Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`. + +.. tab:: Task Decorator -Beside shell task, *PyDolphinScheduler* support multiple tasks and you could -find in :doc:`tasks/index`. + We declare four tasks to show how to create tasks, and both of them are created by the task decorator which + using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named + :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition` + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start task_declare] + :end-before: [end task_declare] + + It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use + Python function as a task and could not use the :doc:`built-in tasks ` most of the cases. Setting Task Dependence ----------------------- -After we declare both process definition and task, we have one workflow with -four tasks, both all tasks is independent so that they would run in parallel. -We should reorder the sort and the dependence of tasks. It useful when we need -run prepare task before we run actual task or we need tasks running is specific -rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise -operators `>>` and `<<`. +After we declare both process definition and task, we have four tasks that are independent and will be running +in parallel. If you want to start one task until some task is finished, you have to set dependence on those +tasks. -In this example, we set task `task_parent` is the upstream task of task -`task_child_one` and `task_child_two`, and task `task_union` is the downstream -task of both these two task. +Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by +bitwise operators :code:`>>` and :code:`<<` -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :dedent: 0 - :start-after: [start task_relation_declare] - :end-before: [end task_relation_declare] +In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and +task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one` +and task `task_child_two` was done, because both two task is `task_union`'s upstream. + +.. tab:: Tradition + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start task_relation_declare] + :end-before: [end task_relation_declare] -Please notice that we could grouping some tasks and set dependence if they have -same downstream or upstream. We declare task `task_child_one` and `task_child_two` -as a group here, named as `task_group` and set task `task_parent` as upstream of -both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept -documentation. +.. tab:: Task Decorator + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start task_relation_declare] + :end-before: [end task_relation_declare] + +.. note:: + + We could set task dependence in batch mode if they have the same downstream or upstream by declaring those + tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named + `task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in + :ref:`concept:Tasks Dependence` for more detail about how to set task dependence. Submit Or Run Workflow ---------------------- -Now we finish our workflow definition, with task and task dependence, but all -these things are in local, we should let Apache DolphinScheduler daemon know what we -define our workflow. So the last thing we have to do here is submit our workflow to -Apache DolphinScheduler daemon. +After that, we finish our workflow definition, with four tasks and task dependence, but all these things are +local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we +have to do is submit the workflow to the DolphinScheduler daemon. -We here in the example using `ProcessDefinition` attribute `run` to submit workflow -to the daemon, and set the schedule time we just declare in `process definition declaration`_. +Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which +will create workflow definition as well as workflow schedule. -Now, we could run the Python code like other Python script, for the basic usage run -:code:`python tutorial.py` to trigger and run it. +.. tab:: Tradition + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py + :dedent: 0 + :start-after: [start submit_or_run] + :end-before: [end submit_or_run] -.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py - :dedent: 0 - :start-after: [start submit_or_run] - :end-before: [end submit_or_run] +.. tab:: Task Decorator + + .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py + :dedent: 0 + :start-after: [start submit_or_run] + :end-before: [end submit_or_run] + +At last, we could execute this workflow code in your terminal like other Python scripts, running +:code:`python tutorial.py` to trigger and execute it. + +.. note:: -If you not start your Apache DolphinScheduler server, you could find the way in -:ref:`start:start Python gateway service` and it would have more detail about related server -start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition` -and it just submit workflow to the daemon but not setting the schedule information. For -more detail you could see :ref:`concept:process definition`. + If you do not start your DolphinScheduler API server, you could find how to start it in + :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute + :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set + the workflow schedule information. For more detail, you could see :ref:`concept:process definition`. DAG Graph After Tutorial Run ---------------------------- -After we run the tutorial code, you could login Apache DolphinScheduler web UI, -go and see the `DolphinScheduler project page`_. they is a new process definition be -created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below +After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the +`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it +named "tutorial" or "tutorial_decorator". The task graph of workflow like below: .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py :language: text diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py index 9632bb1fd9..62f20a54ce 100644 --- a/dolphinscheduler-python/pydolphinscheduler/setup.py +++ b/dolphinscheduler-python/pydolphinscheduler/setup.py @@ -51,6 +51,7 @@ doc = [ "sphinx>=4.3", "sphinx_rtd_theme>=1.0", "sphinx-click>=3.0", + "sphinx-inline-tabs", ] test = [ diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py index 31dc9446d8..7497d1f289 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py @@ -18,10 +18,12 @@ """Init pydolphinscheduler.core package.""" from pydolphinscheduler.core.database import Database +from pydolphinscheduler.core.engine import Engine from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import Task __all__ = [ + "Engine", "ProcessDefinition", "Task", "Database", diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py new file mode 100644 index 0000000000..986c1bbb6e --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +r""" +A tutorial example take you to experience pydolphinscheduler. + +After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create, +and workflow DAG graph as below: + + --> task_child_one + / \ +task_parent --> --> task_union + \ / + --> task_child_two + +it will instantiate and run all the task it have. +""" + +# [start tutorial] +# [start package_import] +# Import ProcessDefinition object to define your workflow attributes +from pydolphinscheduler.core.process_definition import ProcessDefinition + +# Import task Shell object cause we would create some shell tasks later +from pydolphinscheduler.tasks.func_wrap import task + +# [end package_import] + + +# [start task_declare] +@task +def task_parent(): + """First task in this workflow.""" + print("echo hello pydolphinscheduler") + + +@task +def task_child_one(): + """Child task will be run parallel after task ``task_parent`` finished.""" + print("echo 'child one'") + + +@task +def task_child_two(): + """Child task will be run parallel after task ``task_parent`` finished.""" + print("echo 'child two'") + + +@task +def task_union(): + """Last task in this workflow.""" + print("echo union") + + +# [end task_declare] + + +# [start workflow_declare] +with ProcessDefinition( + name="tutorial_decorator", + schedule="0 0 0 * * ? *", + start_time="2021-01-01", + tenant="tenant_exists", +) as pd: + # [end workflow_declare] + + # [start task_relation_declare] + task_group = [task_child_one(), task_child_two()] + task_parent().set_downstream(task_group) + + task_union() << task_group + # [end task_relation_declare] + + # [start submit_or_run] + pd.run() + # [end submit_or_run] +# [end tutorial] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py new file mode 100644 index 0000000000..c0b73a1fc2 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task function wrapper allows using decorator to create a task.""" + +import functools +import inspect +import itertools +import types + +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.tasks.python import Python + + +def _get_func_str(func: types.FunctionType) -> str: + """Get Python function string without indent from decorator. + + :param func: The function which wraps by decorator ``@task``. + """ + lines = inspect.getsourcelines(func)[0] + + src_strip = "" + lead_space_num = None + for line in lines: + if lead_space_num is None: + lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line)) + if line.strip() == "@task": + continue + elif line.strip().startswith("@"): + raise PyDSParamException( + "Do no support other decorators for function ``task`` decorator." + ) + src_strip += line[lead_space_num:] + return src_strip + + +def task(func: types.FunctionType): + """Decorate which covert Python function into pydolphinscheduler task.""" + + @functools.wraps(func) + def wrapper(*args, **kwargs): + func_str = _get_func_str(func) + return Python( + name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs + ) + + return wrapper diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py index 79504808c8..52903d48d9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py @@ -18,34 +18,85 @@ """Task Python.""" import inspect +import logging +import re import types -from typing import Any +from typing import Union from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task from pydolphinscheduler.exceptions import PyDSParamException +log = logging.getLogger(__file__) + class Python(Task): - """Task Python object, declare behavior for Python task to dolphinscheduler.""" + """Task Python object, declare behavior for Python task to dolphinscheduler. + + Python task support two types of parameters for :param:``code``, and here is an example: + + Using str type of :param:``code`` + + .. code-block:: python + + python_task = Python(name="str_type", code="print('Hello Python task.')") + + Or using Python callable type of :param:``code`` + + .. code-block:: python + + def foo(): + print("Hello Python task.") + + python_task = Python(name="str_type", code=foo) + + :param name: The name for Python task. It define the task name. + :param definition: String format of Python script you want to execute or Python callable you + want to execute. + """ _task_custom_attr = { "raw_script", } - def __init__(self, name: str, code: Any, *args, **kwargs): + def __init__( + self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs + ): super().__init__(name, TaskType.PYTHON, *args, **kwargs) - self._code = code + self.definition = definition + + def _build_exe_str(self) -> str: + """Build executable string from given definition. + + Attribute ``self.definition`` almost is a function, we need to call this function after parsing it + to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too. + """ + if isinstance(self.definition, types.FunctionType): + py_function = inspect.getsource(self.definition) + func_str = f"{py_function}{self.definition.__name__}()" + else: + pattern = re.compile("^def (\\w+)\\(") + find = pattern.findall(self.definition) + if not find: + log.warning( + "Python definition is simple script instead of function, with value %s", + self.definition, + ) + return self.definition + # Keep function str and function callable always have one blank line + func_str = ( + f"{self.definition}{find[0]}()" + if self.definition.endswith("\n") + else f"{self.definition}\n{find[0]}()" + ) + return func_str @property def raw_script(self) -> str: """Get python task define attribute `raw_script`.""" - if isinstance(self._code, str): - return self._code - elif isinstance(self._code, types.FunctionType): - py_function = inspect.getsource(self._code) - return py_function + if isinstance(self.definition, (str, types.FunctionType)): + return self._build_exe_str() else: raise PyDSParamException( - "Parameter code do not support % for now.", type(self._code) + "Parameter definition do not support % for now.", type(self.definition) ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py index 5bf897f560..70f367767c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py @@ -44,7 +44,7 @@ def test_task_without_example(): Avoiding add new type of tasks but without adding example describe how to use it. """ # We use example/tutorial.py as shell task example - ignore_name = {"__init__.py", "shell.py"} + ignore_name = {"__init__.py", "shell.py", "func_wrap.py"} all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)} have_example_tasks = set() @@ -97,7 +97,7 @@ def test_example_basic(): ), f"We expect all examples is python script, but get {ex.name}." # All except tutorial and __init__ is end with keyword "_example" - if ex.stem != "tutorial" and ex.stem != "__init__": + if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__": assert ex.stem.endswith( "_example" ), f"We expect all examples script end with keyword '_example', but get {ex.stem}." diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py index 85e5e23e31..218fa4a55c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py @@ -18,6 +18,7 @@ """Test whether success submit examples DAG to PythonGatewayService.""" from pathlib import Path +from subprocess import Popen import pytest @@ -36,6 +37,8 @@ from tests.testing.path import path_example def test_exec_white_list_example(example_path: Path): """Test execute examples and submit DAG to PythonGatewayService.""" try: - exec(example_path.read_text()) + # Because our task decorator used module ``inspect`` to get the source, and it will + # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen`` + Popen(["python", str(example_path)]) except Exception: raise Exception("Run example %s failed.", example_path.stem) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py new file mode 100644 index 0000000000..628b6e7f86 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test module about function wrap task decorator.""" + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.tasks.func_wrap import task +from tests.testing.decorator import foo as foo_decorator +from tests.testing.task import Task + +PD_NAME = "test_process_definition" +TASK_NAME = "test_task" + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) +) +def test_single_task_outside(mock_code): + """Test single decorator task which outside process definition.""" + + @task + def foo(): + print(TASK_NAME) + + with ProcessDefinition(PD_NAME) as pd: + foo() + + assert pd is not None and pd.name == PD_NAME + assert len(pd.tasks) == 1 + + pd_task = pd.tasks[12345] + assert pd_task.name == "foo" + assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) +) +def test_single_task_inside(mock_code): + """Test single decorator task which inside process definition.""" + with ProcessDefinition(PD_NAME) as pd: + + @task + def foo(): + print(TASK_NAME) + + foo() + + assert pd is not None and pd.name == PD_NAME + assert len(pd.tasks) == 1 + + pd_task = pd.tasks[12345] + assert pd_task.name == "foo" + assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) +) +def test_addition_decorator_error(mock_code): + """Test error when using task decorator to a function already have decorator.""" + + @task + @foo_decorator + def foo(): + print(TASK_NAME) + + with ProcessDefinition(PD_NAME) as pd: # noqa: F841 + with pytest.raises( + PyDSParamException, match="Do no support other decorators for.*" + ): + foo() + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, +) +def test_multiple_tasks_outside(mock_code): + """Test multiple decorator tasks which outside process definition.""" + + @task + def foo(): + print(TASK_NAME) + + @task + def bar(): + print(TASK_NAME) + + with ProcessDefinition(PD_NAME) as pd: + foo = foo() + bar = bar() + + foo >> bar + + assert pd is not None and pd.name == PD_NAME + assert len(pd.tasks) == 2 + + task_foo = pd.get_one_task_by_name("foo") + task_bar = pd.get_one_task_by_name("bar") + assert set(pd.task_list) == {task_foo, task_bar} + assert ( + task_foo is not None + and task_foo._upstream_task_codes == set() + and task_foo._downstream_task_codes.pop() == task_bar.code + ) + assert ( + task_bar is not None + and task_bar._upstream_task_codes.pop() == task_foo.code + and task_bar._downstream_task_codes == set() + ) + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, +) +def test_multiple_tasks_inside(mock_code): + """Test multiple decorator tasks which inside process definition.""" + with ProcessDefinition(PD_NAME) as pd: + + @task + def foo(): + print(TASK_NAME) + + @task + def bar(): + print(TASK_NAME) + + foo = foo() + bar = bar() + + foo >> bar + + assert pd is not None and pd.name == PD_NAME + assert len(pd.tasks) == 2 + + task_foo = pd.get_one_task_by_name("foo") + task_bar = pd.get_one_task_by_name("bar") + assert set(pd.task_list) == {task_foo, task_bar} + assert ( + task_foo is not None + and task_foo._upstream_task_codes == set() + and task_foo._downstream_task_codes.pop() == task_bar.code + ) + assert ( + task_bar is not None + and task_bar._upstream_task_codes.pop() == task_foo.code + and task_bar._downstream_task_codes == set() + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py index dbcd2986fb..1cdd85d2cb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.tasks.python import Python +def foo(): # noqa: D103 + print("hello world.") + + @pytest.mark.parametrize( "attr, expect", [ ( - {"code": "print(1)"}, + {"definition": "print(1)"}, { "rawScript": "print(1)", "localParams": [], @@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python "waitStartTimeout": {}, "conditionResult": {"successNode": [""], "failedNode": [""]}, }, - ) + ), + ( + {"definition": "def foo():\n print('I am foo')"}, + { + "rawScript": "def foo():\n print('I am foo')\nfoo()", + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ), + ( + {"definition": foo}, + { + "rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()', + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ), ], ) @patch( @@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect): def test_python_task_not_support_code(mock_code, script_code): """Test python task parameters.""" name = "not_support_code_type" - with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"): + with pytest.raises( + PyDSParamException, match="Parameter definition do not support .*?" + ): task = Python(name, script_code) task.raw_script -def foo(): # noqa: D103 - print("hello world.") - - @pytest.mark.parametrize( "name, script_code, raw", [ @@ -82,7 +106,7 @@ def foo(): # noqa: D103 ( "function_define", foo, - 'def foo(): # noqa: D103\n print("hello world.")\n', + 'def foo(): # noqa: D103\n print("hello world.")\nfoo()', ), ], ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py new file mode 100644 index 0000000000..78078ee863 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Decorator module for testing module.""" + +import types +from functools import wraps + + +def foo(func: types.FunctionType): + """Decorate which do nothing for testing module.""" + + @wraps(func) + def wrapper(): + print("foo decorator called.") + func() + + return wrapper