Browse Source

[DSIP-11][python] create workflows from YAML configuration (#11611)

3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
38ee91fb1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
  2. 13
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
  3. 14
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
  4. 7
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
  5. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
  6. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
  7. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
  8. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
  9. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
  10. 8
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
  11. 14
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
  12. 17
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
  13. 9
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
  14. 96
      dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
  15. 43
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml
  16. 33
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml
  17. 76
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml
  18. 26
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml
  19. 29
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml
  20. 37
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml
  21. 29
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml
  22. 40
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml
  23. 27
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml
  24. 30
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml
  25. 40
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml
  26. 30
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
  27. 45
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml
  28. 27
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml
  29. 39
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml
  30. 62
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json
  31. 22
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql
  32. 26
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml
  33. 46
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml
  34. 14
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
  35. 466
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py
  36. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
  37. 191
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py
  38. 1
      dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py

7
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst

@ -31,3 +31,10 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.condition .. automodule:: pydolphinscheduler.tasks.condition
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Condition.yaml
:start-after: # under the License.
:language: yaml

13
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst

@ -31,3 +31,16 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.datax .. automodule:: pydolphinscheduler.tasks.datax
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/DataX.yaml
:start-after: # under the License.
:language: yaml
example_datax.json:
.. literalinclude:: ../../../examples/yaml_define/example_datax.json
:language: json

14
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst

@ -31,3 +31,17 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.dependent .. automodule:: pydolphinscheduler.tasks.dependent
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Dependent.yaml
:start-after: # under the License.
:language: yaml
Dependent_External.yaml:
.. literalinclude:: ../../../examples/yaml_define/Dependent_External.yaml
:start-after: # under the License.
:language: yaml

7
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst

@ -31,3 +31,10 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.flink .. automodule:: pydolphinscheduler.tasks.flink
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Flink.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst

@ -19,3 +19,11 @@ HTTP
==== ====
.. automodule:: pydolphinscheduler.tasks.http .. automodule:: pydolphinscheduler.tasks.http
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Http.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst

@ -32,3 +32,11 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.map_reduce .. automodule:: pydolphinscheduler.tasks.map_reduce
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/MapReduce.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst

@ -19,3 +19,11 @@ Procedure
========= =========
.. automodule:: pydolphinscheduler.tasks.procedure .. automodule:: pydolphinscheduler.tasks.procedure
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Procedure.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst

@ -19,3 +19,11 @@ Python
====== ======
.. automodule:: pydolphinscheduler.tasks.python .. automodule:: pydolphinscheduler.tasks.python
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Python.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst

@ -31,3 +31,11 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.shell .. automodule:: pydolphinscheduler.tasks.shell
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Shell.yaml
:start-after: # under the License.
:language: yaml

8
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst

@ -31,3 +31,11 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.spark .. automodule:: pydolphinscheduler.tasks.spark
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Spark.yaml
:start-after: # under the License.
:language: yaml

14
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst

@ -19,3 +19,17 @@ SQL
=== ===
.. automodule:: pydolphinscheduler.tasks.sql .. automodule:: pydolphinscheduler.tasks.sql
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Sql.yaml
:start-after: # under the License.
:language: yaml
example_sql.sql:
.. literalinclude:: ../../../examples/yaml_define/example_sql.sql
:start-after: */
:language: sql

17
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst

@ -19,3 +19,20 @@ Sub Process
=========== ===========
.. automodule:: pydolphinscheduler.tasks.sub_process .. automodule:: pydolphinscheduler.tasks.sub_process
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml
:start-after: # under the License.
:language: yaml
example_subprocess.yaml:
.. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml
:start-after: # under the License.
:language: yaml

9
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst

@ -31,3 +31,12 @@ Dive Into
--------- ---------
.. automodule:: pydolphinscheduler.tasks.switch .. automodule:: pydolphinscheduler.tasks.switch
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Switch.yaml
:start-after: # under the License.
:language: yaml

96
dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst

@ -37,6 +37,8 @@ There are two types of tutorials: traditional and task decorator.
versatility to the traditional way because it only supported Python functions and without build-in tasks versatility to the traditional way because it only supported Python functions and without build-in tasks
supported. But it is helpful if your workflow is all built with Python or if you already have some Python supported. But it is helpful if your workflow is all built with Python or if you already have some Python
workflow code and want to migrate them to pydolphinscheduler. workflow code and want to migrate them to pydolphinscheduler.
- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`.
We can find more YAML file examples in `examples/yaml_define <https://github.com/apache/dolphinscheduler/tree/py-yaml/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define>`_
.. tab:: Tradition .. tab:: Tradition
@ -52,6 +54,12 @@ There are two types of tutorials: traditional and task decorator.
:start-after: [start tutorial] :start-after: [start tutorial]
:end-before: [end tutorial] :end-before: [end tutorial]
.. tab:: YAML File
.. literalinclude:: ../../examples/yaml_define/tutorial.yaml
:start-after: # under the License.
:language: yaml
Import Necessary Module Import Necessary Module
----------------------- -----------------------
@ -104,6 +112,13 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena
:start-after: [start workflow_declare] :start-after: [start workflow_declare]
:end-before: [end workflow_declare] :end-before: [end workflow_declare]
.. tab:: YAML File
.. literalinclude:: ../../examples/yaml_define/tutorial.yaml
:start-after: # under the License.
:end-before: # Define the tasks under the workflow
:language: yaml
We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>` We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
if you are interested in it. For all arguments of object process definition, you could find in the if you are interested in it. For all arguments of object process definition, you could find in the
:class:`pydolphinscheduler.core.process_definition` API documentation. :class:`pydolphinscheduler.core.process_definition` API documentation.
@ -139,6 +154,12 @@ Task Declaration
It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases. Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
.. tab:: YAML File
.. literalinclude:: ../../examples/yaml_define/tutorial.yaml
:start-after: # Define the tasks under the workflow
:language: yaml
Setting Task Dependence Setting Task Dependence
----------------------- -----------------------
@ -167,6 +188,14 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst
:start-after: [start task_relation_declare] :start-after: [start task_relation_declare]
:end-before: [end task_relation_declare] :end-before: [end task_relation_declare]
.. tab:: YAML File
We can use :code:`deps:[]` to set task dependence
.. literalinclude:: ../../examples/yaml_define/tutorial.yaml
:start-after: # Define the tasks under the workflow
:language: yaml
.. note:: .. note::
We could set task dependence in batch mode if they have the same downstream or upstream by declaring those We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
@ -198,6 +227,17 @@ will create workflow definition as well as workflow schedule.
:start-after: [start submit_or_run] :start-after: [start submit_or_run]
:end-before: [end submit_or_run] :end-before: [end submit_or_run]
.. tab:: YAML File
pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set :code:`run: true`
.. code-block:: yaml
# Define the workflow
workflow:
name: "tutorial"
run: true
At last, we could execute this workflow code in your terminal like other Python scripts, running At last, we could execute this workflow code in your terminal like other Python scripts, running
:code:`python tutorial.py` to trigger and execute it. :code:`python tutorial.py` to trigger and execute it.
@ -219,5 +259,61 @@ named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
:language: text :language: text
:lines: 24-28 :lines: 24-28
Create Process Using YAML File
------------------------------
We can use pydolphinscheduler CLI to create process using YAML file
.. code-block:: bash
pydolphinscheduler yaml -f Shell.yaml
We can use the following three special grammars to define workflows more flexibly.
- :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location.
- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location.
- :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location.
- :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location.
In addition, when loading the file path use :code:`$FILE{"file_name"}` or :code:`$WORKFLOW{"other_workflow.yaml"}`, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.
For exmaples, our file directory structure is as follows:
.. code-block:: bash
.
└── yaml_define
├── Condition.yaml
├── DataX.yaml
├── Dependent_External.yaml
├── Dependent.yaml
├── example_datax.json
├── example_sql.sql
├── example_subprocess.yaml
├── Flink.yaml
├── Http.yaml
├── MapReduce.yaml
├── MoreConfiguration.yaml
├── Procedure.yaml
├── Python.yaml
├── Shell.yaml
├── Spark.yaml
├── Sql.yaml
├── SubProcess.yaml
└── Switch.yaml
After we run
.. code-block:: bash
pydolphinscheduler yaml -file yaml_define/SubProcess.yaml
the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
Furthermore, this feature supports recursion all the way down.
.. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html .. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html
.. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types .. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types

43
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Condition"
# Define the tasks under the workflow
tasks:
- { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
- { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
- { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
- { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
- { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }
- name: condition
task_type: Condition
success_task: success_branch
failed_task: fail_branch
op: AND
groups:
- op: AND
groups:
- task: pre_task_1
flag: true
- task: pre_task_2
flag: true
- task: pre_task_3
flag: false

33
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml

@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "DataX"
# Define the tasks under the workflow
tasks:
- name: task
task_type: DataX
datasource_name: db
datatarget_name: db
sql: show tables;
target_table: table_test
- name: task_custon_config
task_type: CustomDataX
json: $FILE{"example_datax.json"}

76
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml

@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
workflow:
name: "Dependent"
# Define the tasks under the workflow
tasks:
- name: dependent
task_type: Dependent
denpendence:
op: and
groups:
- op: or
groups:
- project_name: pydolphin
process_definition_name: task_dependent_external
dependent_task_name: task_1
- project_name: pydolphin
process_definition_name: task_dependent_external
dependent_task_name: task_2
- op: and
groups:
- project_name: pydolphin
process_definition_name: task_dependent_external
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
- project_name: pydolphin
process_definition_name: task_dependent_external
dependent_task_name: task_2
dependent_date: last24Hours
- name: dependent_var
task_type: Dependent
denpendence:
op: and
groups:
- op: or
# we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
# we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
groups:
- project_name: ${CONFIG.WORKFLOW_PROJECT}
process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_1
- project_name: ${CONFIG.WORKFLOW_PROJECT}
process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_2
- op: and
groups:
- project_name: ${CONFIG.WORKFLOW_PROJECT}
process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
- project_name: ${CONFIG.WORKFLOW_PROJECT}
process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
dependent_task_name: task_2
dependent_date: last24Hours

26
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "task_dependent_external"
# Define the tasks under the workflow
tasks:
- { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
- { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
- { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }

29
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Flink"
# Define the tasks under the workflow
tasks:
- name: task
task_type: Flink
main_class: org.apache.flink.streaming.examples.wordcount.WordCount
main_package: test_java.jar
program_type: JAVA
deploy_mode: local

37
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml

@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Http"
# Define the tasks under the workflow
tasks:
- name: task
task_type: Http
url: "https://httpbin.org/get"
http_method: "GET"
http_params:
- { "prop": "a", "httpParametersType": "PARAMETER", "value": "1" }
- { "prop": "b", "httpParametersType": "PARAMETER", "value": "2" }
- {
"prop": "Content-Type",
"httpParametersType": "header",
"value": "test",
}
http_check_condition: "STATUS_CODE_CUSTOM"
condition: "404"

29
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "MapReduce"
# Define the tasks under the workflow
tasks:
- name: task
task_type: MR
main_class: wordcount
main_package: test_java.jar
program_type: SCALA
main_args: /dolphinscheduler/tenant_exists/resources/file.txt /output/ds

40
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "MoreConfiguration"
param:
n: 1
# Define the tasks under the workflow
tasks:
- name: shell_0
task_type: Shell
description: "yaml define task"
flag: "YES"
command: |
echo "$ENV{HOME}"
echo "${n}"
task_priority: "HIGH"
delay_time: 20
fail_retry_times: 30
fail_retry_interval: 5
timeout_flag: "CLOSE"
timeout: 60
local_params:
- { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }

27
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml

@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Procedure"
# Define the tasks under the workflow
tasks:
- name: task
task_type: Procedure
datasource_name: db
method: show tables;

30
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml

@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Python"
# Define the tasks under the workflow
tasks:
- name: python
task_type: Python
definition: |
import os
print(os)
print("1")
print("2")

40
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Shell"
release_state: "offline"
run: true
# Define the tasks under the process
tasks:
- name: task_parent
task_type: Shell
command: |
echo hello pydolphinscheduler
echo run task parent
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"

30
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml

@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Spark"
# Define the tasks under the workflow
tasks:
- name: task
task_type: Spark
main_class: org.apache.spark.examples.SparkPi
main_package: test_java.jar
program_type: SCALA
deploy_mode: local
spark_version: SPARK1

45
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Sql"
# Define the tasks under the workflow
tasks:
- name: task_base
task_type: Sql
datasource_name: "db"
sql: show tables;
- name: task_multi_line
task_type: Sql
datasource_name: "db"
sql: |
show tables;
select id from version where id=1;
- name: task_file
task_type: Sql
datasource_name: "db"
sql: $FILE{"example_sql.sql"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
# Or you can define task "task_union" it with one line
- { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}

27
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml

@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "SubWorkflow"
tasks:
- name: example_workflow
task_type: SubProcess
process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}
- { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }

39
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml

@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Switch"
param:
var: 1
# Define the tasks under the workflow
tasks:
- name: switch_child_1
task_type: Shell
command: echo switch_child_1
- name: switch_child_2
task_type: Shell
command: echo switch_child_2
- name: switch
task_type: Switch
condition:
- task: switch_child_1
condition: "${var} > 1"
- task: switch_child_2

62
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "usr",
"password": "pwd",
"column": [
"id",
"name",
"code",
"description"
],
"splitPk": "id",
"connection": [
{
"table": [
"source_table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/source_db"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "usr",
"password": "pwd",
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
"table": [
"target_table"
]
}
]
}
}
}
],
"setting": {
"errorLimit": {
"percentage": 0,
"record": 0
},
"speed": {
"channel": 1,
"record": 1000
}
}
}
}

22
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
select id from version where id=1;
select id from version where id=2;
select id from version where id=3;
select id from version where id=4;
select id from version where id=5;

26
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "example_workflow_for_sub_workflow"
# Define the tasks under the workflow
tasks:
- { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
- { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
- { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }

46
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml

@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "tutorial"
schedule: "0 0 0 * * ? *"
start_time: "2021-01-01"
tenant: "tenant_exists"
release_state: "offline"
run: true
# Define the tasks under the workflow
tasks:
- name: task_parent
task_type: Shell
command: echo hello pydolphinscheduler
- name: task_child_one
task_type: Shell
deps: [task_parent]
command: echo "child one"
- name: task_child_two
task_type: Shell
deps: [task_parent]
command: echo "child two"
- name: task_union
task_type: Shell
deps: [task_child_one, task_child_two]
command: echo "union"

14
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py

@ -26,6 +26,7 @@ from pydolphinscheduler.configuration import (
init_config_file, init_config_file,
set_single_config, set_single_config,
) )
from pydolphinscheduler.core.yaml_process_define import create_process_definition
version_option_val = ["major", "minor", "micro"] version_option_val = ["major", "minor", "micro"]
@ -90,3 +91,16 @@ def config(getter, setter, init) -> None:
for key, val in setter: for key, val in setter:
set_single_config(key, val) set_single_config(key, val)
click.echo("Set configuration done.") click.echo("Set configuration done.")
@cli.command()
@click.option(
"--yaml_file",
"-f",
required=True,
help="YAML file path",
type=click.Path(exists=True),
)
def yaml(yaml_file) -> None:
"""Create process definition using YAML file."""
create_process_definition(yaml_file)

466
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py

@ -0,0 +1,466 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Parse YAML file to create process."""
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict
from pydolphinscheduler import configuration, tasks
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
from pydolphinscheduler.utils.yaml_parser import YamlParser
logger = logging.getLogger(__file__)
KEY_PROCESS = "workflow"
KEY_TASK = "tasks"
KEY_TASK_TYPE = "task_type"
KEY_DEPS = "deps"
KEY_OP = "op"
TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS]
class ParseTool:
"""Enhanced parsing tools."""
@staticmethod
def parse_string_param_if_file(string_param: str, **kwargs):
"""Use $FILE{"data_path"} to load file from "data_path"."""
if string_param.startswith("$FILE"):
path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
base_folder = kwargs.get("base_folder", ".")
path = ParseTool.get_possible_path(path, base_folder)
with open(path, "r") as read_file:
string_param = "".join(read_file)
return string_param
@staticmethod
def parse_string_param_if_env(string_param: str, **kwargs):
"""Use $ENV{env_name} to load environment variable "env_name"."""
if "$ENV" in string_param:
key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
env_value = os.environ.get(key, "$%s" % key)
string_param = string_param.replace("$ENV{%s}" % key, env_value)
return string_param
@staticmethod
def parse_string_param_if_config(string_param: str, **kwargs):
"""Use ${CONFIG.var_name} to load variable "var_name" from configuration."""
if "${CONFIG" in string_param:
key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
if hasattr(configuration, key):
string_param = getattr(configuration, key)
else:
string_param = configuration.get_single_config(key)
return string_param
@staticmethod
def get_possible_path(file_path, base_folder):
"""Get file possible path.
Return new path if file_path is not exists, but base_folder + file_path exists
"""
possible_path = file_path
if not Path(file_path).exists():
new_path = Path(base_folder).joinpath(file_path)
if new_path.exists():
possible_path = new_path
logger.info(f"{file_path} not exists, convert to {possible_path}")
return possible_path
def get_task_cls(task_type) -> Task:
"""Get the task class object by task_type (case compatible)."""
# only get task class from tasks.__all__
all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
task_type_cap = task_type.capitalize()
if task_type_cap not in all_task_types:
raise PyDSTaskNoFoundException("cant not find task %s" % task_type)
standard_name = all_task_types[task_type_cap]
return getattr(tasks, standard_name)
class YamlProcess(YamlParser):
"""Yaml parser for create process.
:param yaml_file: yaml file path.
examples1 ::
parser = YamlParser(yaml_file=...)
parser.create_process_definition()
examples2 ::
YamlParser(yaml_file=...).create_process_definition()
"""
_parse_rules = [
ParseTool.parse_string_param_if_file,
ParseTool.parse_string_param_if_env,
ParseTool.parse_string_param_if_config,
]
def __init__(self, yaml_file: str):
with open(yaml_file, "r") as f:
content = f.read()
self._base_folder = Path(yaml_file).parent
content = self.prepare_refer_process(content)
super().__init__(content)
def create_process_definition(self):
"""Create process main function."""
# get process parameters with key "workflow"
process_params = self[KEY_PROCESS]
# pop "run" parameter, used at the end
is_run = process_params.pop("run", False)
# use YamlProcess._parse_rules to parse special value of yaml file
process_params = self.parse_params(process_params)
process_name = process_params["name"]
logger.info(f"Create Process: {process_name}")
with ProcessDefinition(**process_params) as pd:
# save dependencies between tasks
dependencies = {}
# save name and task mapping
name2task = {}
# get task datas with key "tasks"
for task_data in self[KEY_TASK]:
task = self.parse_task(task_data, name2task)
deps = task_data.get(KEY_DEPS, [])
if deps:
dependencies[task.name] = deps
name2task[task.name] = task
# build dependencies between task
for downstream_task_name, deps in dependencies.items():
downstream_task = name2task[downstream_task_name]
for upstream_task_name in deps:
upstream_task = name2task[upstream_task_name]
upstream_task >> downstream_task
pd.submit()
# if set is_run, run the process after submit
if is_run:
logger.info(f"run workflow: {pd}")
pd.run()
return process_name
def parse_params(self, params: Any):
"""Recursively resolves the parameter values.
The function operates params only when it encounters a string; other types continue recursively.
"""
if isinstance(params, str):
for parse_rule in self._parse_rules:
params_ = params
params = parse_rule(params, base_folder=self._base_folder)
if params_ != params:
logger.info(f"parse {params_} -> {params}")
elif isinstance(params, list):
for index in range(len(params)):
params[index] = self.parse_params(params[index])
elif isinstance(params, dict):
for key, value in params.items():
params[key] = self.parse_params(value)
return params
@classmethod
def parse(cls, yaml_file: str):
"""Recursively resolves the parameter values.
The function operates params only when it encounters a string; other types continue recursively.
"""
process_name = cls(yaml_file).create_process_definition()
return process_name
def prepare_refer_process(self, content):
"""Allow YAML files to reference process derived from other YAML files."""
process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
for process_path in process_paths:
logger.info(
f"find special token {process_path}, load process form {process_path}"
)
possible_path = ParseTool.get_possible_path(process_path, self._base_folder)
process_name = YamlProcess.parse(possible_path)
content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name)
return content
def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
"""Parse various types of tasks.
:param task_data: dict.
{
"task_type": "Shell",
"params": {"name": "shell_task", "command":"ehco hellp"}
}
:param name2task: Dict[str, Task]), mapping of task_name and task
Some task type have special parse func:
if task type is Switch, use parse_switch;
if task type is Condition, use parse_condition;
if task type is Dependent, use parse_dependent;
other, we pass all task_params as input to task class, like "task_cls(**task_params)".
"""
task_type = task_data["task_type"]
# get params without special key
task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
task_cls = get_task_cls(task_type)
# use YamlProcess._parse_rules to parse special value of yaml file
task_params = self.parse_params(task_params)
if task_cls == tasks.Switch:
task = self.parse_switch(task_params, name2task)
elif task_cls == tasks.Condition:
task = self.parse_condition(task_params, name2task)
elif task_cls == tasks.Dependent:
task = self.parse_dependent(task_params, name2task)
else:
task = task_cls(**task_params)
logger.info(task_type, task)
return task
def parse_switch(self, task_params, name2task):
"""Parse Switch Task.
This is an example Yaml fragment of task_params
name: switch
condition:
- ["${var} > 1", switch_child_1]
- switch_child_2
"""
from pydolphinscheduler.tasks.switch import (
Branch,
Default,
Switch,
SwitchCondition,
)
condition_datas = task_params["condition"]
conditions = []
for condition_data in condition_datas:
assert "task" in condition_data, "task must be in %s" % condition_data
task_name = condition_data["task"]
condition_string = condition_data.get("condition", None)
# if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
if condition_string is None:
conditions.append(Default(task=name2task.get(task_name)))
# if condition_string is not None, for example:
# {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
else:
conditions.append(
Branch(condition_string, task=name2task.get(task_name))
)
switch = Switch(
name=task_params["name"], condition=SwitchCondition(*conditions)
)
return switch
def parse_condition(self, task_params, name2task):
"""Parse Condition Task.
This is an example Yaml fragment of task_params
name: condition
success_task: success_branch
failed_task: fail_branch
OP: AND
groups:
-
OP: AND
groups:
- [pre_task_1, true]
- [pre_task_2, true]
- [pre_task_3, false]
-
OP: AND
groups:
- [pre_task_1, false]
- [pre_task_2, true]
- [pre_task_3, true]
"""
from pydolphinscheduler.tasks.condition import (
FAILURE,
SUCCESS,
And,
Condition,
Or,
)
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception("OP must be in And or Or, but get: %s" % op)
return cls
second_cond_ops = []
for first_group in task_params["groups"]:
second_op = first_group["op"]
task_ops = []
for condition_data in first_group["groups"]:
assert "task" in condition_data, "task must be in %s" % condition_data
assert "flag" in condition_data, "flag must be in %s" % condition_data
task_name = condition_data["task"]
flag = condition_data["flag"]
task = name2task[task_name]
# for example: task = pre_task_1, flag = true
if flag:
task_ops.append(SUCCESS(task))
else:
task_ops.append(FAILURE(task))
second_cond_ops.append(get_op_cls(second_op)(*task_ops))
first_op = task_params["op"]
cond_operator = get_op_cls(first_op)(*second_cond_ops)
condition = Condition(
name=task_params["name"],
condition=cond_operator,
success_task=name2task[task_params["success_task"]],
failed_task=name2task[task_params["failed_task"]],
)
return condition
def parse_dependent(self, task_params, name2task):
"""Parse Dependent Task.
This is an example Yaml fragment of task_params
name: dependent
denpendence:
OP: AND
groups:
-
OP: Or
groups:
- [pydolphin, task_dependent_external, task_1]
- [pydolphin, task_dependent_external, task_2]
-
OP: And
groups:
- [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
- [pydolphin, task_dependent_external, task_2, last24Hours]
"""
from pydolphinscheduler.tasks.dependent import (
And,
Dependent,
DependentDate,
DependentItem,
Or,
)
def process_dependent_date(dependent_date):
"""Parse dependent date (Compatible with key and value of DependentDate)."""
dependent_date_upper = dependent_date.upper()
if hasattr(DependentDate, dependent_date_upper):
dependent_date = getattr(DependentDate, dependent_date_upper)
return dependent_date
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception("OP must be in And or Or, but get: %s" % op)
return cls
def create_dependent_item(source_items):
"""Parse dependent item.
project_name: pydolphin
process_definition_name: task_dependent_external
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
"""
project_name = source_items["project_name"]
process_definition_name = source_items["process_definition_name"]
dependent_task_name = source_items["dependent_task_name"]
dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
dependent_item = DependentItem(
project_name=project_name,
process_definition_name=process_definition_name,
dependent_task_name=dependent_task_name,
dependent_date=process_dependent_date(dependent_date),
)
return dependent_item
second_dependences = []
for first_group in task_params["groups"]:
second_op = first_group[KEY_OP]
dependence_items = []
for source_items in first_group["groups"]:
dependence_items.append(create_dependent_item(source_items))
second_dependences.append(get_op_cls(second_op)(*dependence_items))
first_op = task_params[KEY_OP]
dependence = get_op_cls(first_op)(*second_dependences)
task = Dependent(
name=task_params["name"],
dependence=dependence,
)
return task
def create_process_definition(yaml_file):
"""CLI."""
YamlProcess.parse(yaml_file)

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py

@ -35,6 +35,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
__all__ = [ __all__ = [
"Condition", "Condition",
"DataX", "DataX",
"CustomDataX",
"Dependent", "Dependent",
"Flink", "Flink",
"Http", "Http",

191
dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py

@ -0,0 +1,191 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test YAML process."""
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from pydolphinscheduler import configuration, tasks
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.yaml_process_define import (
ParseTool,
create_process_definition,
get_task_cls,
)
from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
from tests.testing.path import path_yaml_example
from tests.testing.task import Task
@pytest.mark.parametrize(
"string_param, expect",
[
("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"),
],
)
def test_parse_tool_env_exist(string_param, expect):
"""Test parsing the environment variable."""
os.environ["PROJECT_NAME"] = expect
assert expect == ParseTool.parse_string_param_if_env(string_param)
def test_parse_tool_env_not_exist():
"""Test parsing the not exist environment variable."""
key = "THIS_ENV_NOT_EXIST_0000000"
string_param = "$ENV{%s}" % key
expect = "$" + key
assert expect == ParseTool.parse_string_param_if_env(string_param)
@pytest.mark.parametrize(
"string_param, expect_key",
[
("${CONFIG.java_gateway.address}", "java_gateway.address"),
("${CONFIG.WORKFLOW_PROJECT}", "default.workflow.project"),
],
)
def test_parse_tool_config(string_param, expect_key):
"""Test parsing configuration."""
expect = configuration.get_single_config(expect_key)
assert expect == ParseTool.parse_string_param_if_config(string_param)
def test_parse_possible_yaml_file():
"""Test parsing possible path."""
folder = Path(path_yaml_example)
file_name = "Shell.yaml"
path = folder.joinpath(file_name)
with open(path, "r") as f:
expect = "".join(f)
string_param = '$FILE{"%s"}' % file_name
content_ = ParseTool.parse_string_param_if_file(string_param, base_folder=folder)
assert expect == content_
def test_parse_tool_parse_possible_path_file():
"""Test parsing possible path."""
folder = Path(path_yaml_example)
file_name = "Shell.yaml"
path = folder.joinpath(file_name)
possible_path = ParseTool.get_possible_path(path, base_folder=folder)
assert path == possible_path
possible_path = ParseTool.get_possible_path(file_name, base_folder=folder)
assert path == possible_path
possible_path = ParseTool.get_possible_path(file_name, base_folder=".")
assert path != possible_path
@pytest.mark.parametrize(
"task_type, expect",
[
("shell", tasks.Shell),
("Shell", tasks.Shell),
("ShEll", tasks.Shell),
("Condition", tasks.Condition),
("DataX", tasks.DataX),
("CustomDataX", tasks.CustomDataX),
("Dependent", tasks.Dependent),
("Flink", tasks.Flink),
("Http", tasks.Http),
("MR", tasks.MR),
("Procedure", tasks.Procedure),
("Python", tasks.Python),
("Shell", tasks.Shell),
("Spark", tasks.Spark),
("Sql", tasks.Sql),
("SubProcess", tasks.SubProcess),
("Switch", tasks.Switch),
("SageMaker", tasks.SageMaker),
],
)
def test_get_task(task_type, expect):
"""Test get task function."""
assert expect == get_task_cls(task_type)
@pytest.mark.parametrize(
"task_type",
[
("MYSQL"),
],
)
def test_get_error(task_type):
"""Test get task cls error."""
with pytest.raises(
PyDSTaskNoFoundException,
match=f"not find task {task_type}",
):
get_task_cls(task_type)
@pytest.mark.parametrize(
"yaml_file",
[
("Condition.yaml"),
("DataX.yaml"),
("Dependent.yaml"),
("Flink.yaml"),
("Procedure.yaml"),
("Http.yaml"),
("MapReduce.yaml"),
("Python.yaml"),
("Shell.yaml"),
("Spark.yaml"),
("Sql.yaml"),
("SubProcess.yaml"),
# ("Switch.yaml"),
("MoreConfiguration.yaml"),
],
)
@patch(
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "test"}),
)
@patch(
"pydolphinscheduler.core.database.Database.get_database_info",
return_value=({"id": 1, "type": "mock_type"}),
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": 0,
"processDefinitionCode": 0,
"taskDefinitionCode": 0,
},
)
@patch.object(ProcessDefinition, "run")
@patch.object(ProcessDefinition, "submit")
def test_get_create_process_definition(
prun, psubmit, dep_item, db_info, resource_info, yaml_file
):
"""Test create_process_definition function to parse example YAML file."""
yaml_file_path = Path(path_yaml_example).joinpath(yaml_file)
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
):
create_process_definition(yaml_file_path)

1
dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py

@ -24,6 +24,7 @@ project_root = Path(__file__).parent.parent.parent
path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks") path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
path_example = project_root.joinpath("src", "pydolphinscheduler", "examples") path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
path_yaml_example = project_root.joinpath("examples", "yaml_define")
path_doc_tasks = project_root.joinpath("docs", "source", "tasks") path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
path_default_config_yaml = project_root.joinpath( path_default_config_yaml = project_root.joinpath(
"src", "pydolphinscheduler", "default_config.yaml" "src", "pydolphinscheduler", "default_config.yaml"

Loading…
Cancel
Save