diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst index 20b0350078..f6d7e6ad8f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst @@ -31,3 +31,10 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.condition + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Condition.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst index c07726941e..cb67a2fa9e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst @@ -31,3 +31,16 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.datax + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/DataX.yaml + :start-after: # under the License. + :language: yaml + + +example_datax.json: + +.. literalinclude:: ../../../examples/yaml_define/example_datax.json + :language: json diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst index fe26d0f30a..d8e1599b2d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst @@ -31,3 +31,17 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.dependent + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Dependent.yaml + :start-after: # under the License. + :language: yaml + +Dependent_External.yaml: + +.. literalinclude:: ../../../examples/yaml_define/Dependent_External.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst index 8db9ac266d..76eb484718 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst @@ -31,3 +31,10 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.flink + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Flink.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst index 5f41b80cfd..a4a2972933 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst @@ -30,4 +30,4 @@ Example Dive Into --------- -.. automodule:: pydolphinscheduler.tasks.func_wrap \ No newline at end of file +.. automodule:: pydolphinscheduler.tasks.func_wrap diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst index 4c6d8f8e40..4e138c9989 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst @@ -19,3 +19,11 @@ HTTP ==== .. automodule:: pydolphinscheduler.tasks.http + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Http.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst index 068b8d8b41..7356880b26 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst @@ -32,3 +32,11 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.map_reduce + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/MapReduce.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst index cd79eff140..2f28efc526 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst @@ -19,3 +19,11 @@ Procedure ========= .. automodule:: pydolphinscheduler.tasks.procedure + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Procedure.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst index 660e46a6e5..1bf6210018 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst @@ -19,3 +19,11 @@ Python ====== .. automodule:: pydolphinscheduler.tasks.python + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Python.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst index 5ce16c3c9f..2dd106a3b8 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst @@ -31,3 +31,11 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.shell + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Shell.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst index cdb5902c37..d5a51db91a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst @@ -31,3 +31,11 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.spark + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Spark.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst index 21eaec7ae9..52df042b74 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst @@ -19,3 +19,17 @@ SQL === .. automodule:: pydolphinscheduler.tasks.sql + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Sql.yaml + :start-after: # under the License. + :language: yaml + +example_sql.sql: + +.. literalinclude:: ../../../examples/yaml_define/example_sql.sql + :start-after: */ + :language: sql diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst index 8a9f562200..894dd0fbad 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst @@ -19,3 +19,20 @@ Sub Process =========== .. automodule:: pydolphinscheduler.tasks.sub_process + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml + :start-after: # under the License. + :language: yaml + + + +example_subprocess.yaml: + +.. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml + :start-after: # under the License. + :language: yaml + diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst index d8b34a4ac9..2fef589efb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst @@ -31,3 +31,12 @@ Dive Into --------- .. automodule:: pydolphinscheduler.tasks.switch + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Switch.yaml + :start-after: # under the License. + :language: yaml + diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst index 6366c803bb..4efc09ae7b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst @@ -37,6 +37,8 @@ There are two types of tutorials: traditional and task decorator. versatility to the traditional way because it only supported Python functions and without build-in tasks supported. But it is helpful if your workflow is all built with Python or if you already have some Python workflow code and want to migrate them to pydolphinscheduler. +- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. + We can find more YAML file examples in `examples/yaml_define `_ .. tab:: Tradition @@ -52,6 +54,12 @@ There are two types of tutorials: traditional and task decorator. :start-after: [start tutorial] :end-before: [end tutorial] +.. tab:: YAML File + + .. literalinclude:: ../../examples/yaml_define/tutorial.yaml + :start-after: # under the License. + :language: yaml + Import Necessary Module ----------------------- @@ -104,6 +112,13 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena :start-after: [start workflow_declare] :end-before: [end workflow_declare] +.. tab:: YAML File + + .. literalinclude:: ../../examples/yaml_define/tutorial.yaml + :start-after: # under the License. + :end-before: # Define the tasks under the workflow + :language: yaml + We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition ` if you are interested in it. For all arguments of object process definition, you could find in the :class:`pydolphinscheduler.core.process_definition` API documentation. @@ -139,6 +154,12 @@ Task Declaration It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use Python function as a task and could not use the :doc:`built-in tasks ` most of the cases. +.. tab:: YAML File + + .. literalinclude:: ../../examples/yaml_define/tutorial.yaml + :start-after: # Define the tasks under the workflow + :language: yaml + Setting Task Dependence ----------------------- @@ -167,6 +188,14 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst :start-after: [start task_relation_declare] :end-before: [end task_relation_declare] +.. tab:: YAML File + + We can use :code:`deps:[]` to set task dependence + + .. literalinclude:: ../../examples/yaml_define/tutorial.yaml + :start-after: # Define the tasks under the workflow + :language: yaml + .. note:: We could set task dependence in batch mode if they have the same downstream or upstream by declaring those @@ -198,6 +227,17 @@ will create workflow definition as well as workflow schedule. :start-after: [start submit_or_run] :end-before: [end submit_or_run] +.. tab:: YAML File + + pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set :code:`run: true` + + .. code-block:: yaml + + # Define the workflow + workflow: + name: "tutorial" + run: true + At last, we could execute this workflow code in your terminal like other Python scripts, running :code:`python tutorial.py` to trigger and execute it. @@ -219,5 +259,61 @@ named "tutorial" or "tutorial_decorator". The task graph of workflow like below: :language: text :lines: 24-28 +Create Process Using YAML File +------------------------------ + +We can use pydolphinscheduler CLI to create process using YAML file + +.. code-block:: bash + + pydolphinscheduler yaml -f Shell.yaml + +We can use the following three special grammars to define workflows more flexibly. + +- :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location. +- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location. +- :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location. +- :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location. + + +In addition, when loading the file path use :code:`$FILE{"file_name"}` or :code:`$WORKFLOW{"other_workflow.yaml"}`, pydolphinscheduler will search in the path of the YAMl file if the file does not exist. + +For exmaples, our file directory structure is as follows: + +.. code-block:: bash + + . + └── yaml_define + ├── Condition.yaml + ├── DataX.yaml + ├── Dependent_External.yaml + ├── Dependent.yaml + ├── example_datax.json + ├── example_sql.sql + ├── example_subprocess.yaml + ├── Flink.yaml + ├── Http.yaml + ├── MapReduce.yaml + ├── MoreConfiguration.yaml + ├── Procedure.yaml + ├── Python.yaml + ├── Shell.yaml + ├── Spark.yaml + ├── Sql.yaml + ├── SubProcess.yaml + └── Switch.yaml + +After we run + +.. code-block:: bash + + pydolphinscheduler yaml -file yaml_define/SubProcess.yaml + + +the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does. + +Furthermore, this feature supports recursion all the way down. + + .. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html .. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml new file mode 100644 index 0000000000..c65b8c7aeb --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Condition" + +# Define the tasks under the workflow +tasks: + - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" } + - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" } + - { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" } + - { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" } + - { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" } + + - name: condition + task_type: Condition + success_task: success_branch + failed_task: fail_branch + op: AND + groups: + - op: AND + groups: + - task: pre_task_1 + flag: true + - task: pre_task_2 + flag: true + - task: pre_task_3 + flag: false diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml new file mode 100644 index 0000000000..00ecd54685 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "DataX" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: DataX + datasource_name: db + datatarget_name: db + sql: show tables; + target_table: table_test + + - name: task_custon_config + task_type: CustomDataX + json: $FILE{"example_datax.json"} diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml new file mode 100644 index 0000000000..d69fac05da --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +workflow: + name: "Dependent" + +# Define the tasks under the workflow +tasks: + - name: dependent + task_type: Dependent + denpendence: + op: and + groups: + - op: or + groups: + - project_name: pydolphin + process_definition_name: task_dependent_external + dependent_task_name: task_1 + + - project_name: pydolphin + process_definition_name: task_dependent_external + dependent_task_name: task_2 + + - op: and + groups: + - project_name: pydolphin + process_definition_name: task_dependent_external + dependent_task_name: task_1 + dependent_date: LAST_WEDNESDAY + + - project_name: pydolphin + process_definition_name: task_dependent_external + dependent_task_name: task_2 + dependent_date: last24Hours + + - name: dependent_var + task_type: Dependent + denpendence: + op: and + groups: + - op: or + # we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT + # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name + groups: + - project_name: ${CONFIG.WORKFLOW_PROJECT} + process_definition_name: $WORKFLOW{"Dependent_External.yaml"} + dependent_task_name: task_1 + + - project_name: ${CONFIG.WORKFLOW_PROJECT} + process_definition_name: $WORKFLOW{"Dependent_External.yaml"} + dependent_task_name: task_2 + - op: and + groups: + - project_name: ${CONFIG.WORKFLOW_PROJECT} + process_definition_name: $WORKFLOW{"Dependent_External.yaml"} + dependent_task_name: task_1 + dependent_date: LAST_WEDNESDAY + + - project_name: ${CONFIG.WORKFLOW_PROJECT} + process_definition_name: $WORKFLOW{"Dependent_External.yaml"} + dependent_task_name: task_2 + dependent_date: last24Hours diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml new file mode 100644 index 0000000000..577ff6a807 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "task_dependent_external" + +# Define the tasks under the workflow +tasks: + - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" } + - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" } + - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" } diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml new file mode 100644 index 0000000000..2449d435a3 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Flink" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: Flink + main_class: org.apache.flink.streaming.examples.wordcount.WordCount + main_package: test_java.jar + program_type: JAVA + deploy_mode: local diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml new file mode 100644 index 0000000000..1483aeb3d8 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Http" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: Http + url: "https://httpbin.org/get" + http_method: "GET" + http_params: + - { "prop": "a", "httpParametersType": "PARAMETER", "value": "1" } + - { "prop": "b", "httpParametersType": "PARAMETER", "value": "2" } + - { + "prop": "Content-Type", + "httpParametersType": "header", + "value": "test", + } + http_check_condition: "STATUS_CODE_CUSTOM" + condition: "404" diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml new file mode 100644 index 0000000000..e1a2b5709c --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "MapReduce" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: MR + main_class: wordcount + main_package: test_java.jar + program_type: SCALA + main_args: /dolphinscheduler/tenant_exists/resources/file.txt /output/ds diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml new file mode 100644 index 0000000000..258aa33433 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "MoreConfiguration" + param: + n: 1 + +# Define the tasks under the workflow +tasks: + - name: shell_0 + task_type: Shell + description: "yaml define task" + flag: "YES" + command: | + echo "$ENV{HOME}" + echo "${n}" + task_priority: "HIGH" + delay_time: 20 + fail_retry_times: 30 + fail_retry_interval: 5 + timeout_flag: "CLOSE" + timeout: 60 + local_params: + - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" } diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml new file mode 100644 index 0000000000..829a961c1a --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Procedure" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: Procedure + datasource_name: db + method: show tables; diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml new file mode 100644 index 0000000000..728b5c928e --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Python" + +# Define the tasks under the workflow +tasks: + - name: python + task_type: Python + definition: | + import os + print(os) + print("1") + print("2") diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml new file mode 100644 index 0000000000..fdbe126327 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Shell" + release_state: "offline" + run: true + +# Define the tasks under the process +tasks: + - name: task_parent + task_type: Shell + command: | + echo hello pydolphinscheduler + echo run task parent + + - name: task_child_one + task_type: Shell + deps: [task_parent] + command: echo "child one" + + - name: task_child_two + task_type: Shell + deps: [task_parent] + command: echo "child two" diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml new file mode 100644 index 0000000000..6132b8d749 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Spark" + +# Define the tasks under the workflow +tasks: + - name: task + task_type: Spark + main_class: org.apache.spark.examples.SparkPi + main_package: test_java.jar + program_type: SCALA + deploy_mode: local + spark_version: SPARK1 diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml new file mode 100644 index 0000000000..c3c7e88ee1 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Sql" + +# Define the tasks under the workflow +tasks: + - name: task_base + task_type: Sql + datasource_name: "db" + sql: show tables; + + - name: task_multi_line + task_type: Sql + datasource_name: "db" + sql: | + show tables; + select id from version where id=1; + + - name: task_file + task_type: Sql + datasource_name: "db" + sql: $FILE{"example_sql.sql"} + + # Or you can define task "task_union" it with one line + - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"} + + # Or you can define task "task_union" it with one line + - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'} diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml new file mode 100644 index 0000000000..0ea7549db4 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "SubWorkflow" + +tasks: + - name: example_workflow + task_type: SubProcess + process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"} + + - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" } diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml new file mode 100644 index 0000000000..33ed68813e --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Switch" + param: + var: 1 + +# Define the tasks under the workflow +tasks: + - name: switch_child_1 + task_type: Shell + command: echo switch_child_1 + + - name: switch_child_2 + task_type: Shell + command: echo switch_child_2 + + - name: switch + task_type: Switch + condition: + - task: switch_child_1 + condition: "${var} > 1" + - task: switch_child_2 diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json new file mode 100644 index 0000000000..3db8092cb6 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json @@ -0,0 +1,62 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "usr", + "password": "pwd", + "column": [ + "id", + "name", + "code", + "description" + ], + "splitPk": "id", + "connection": [ + { + "table": [ + "source_table" + ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/source_db" + ] + } + ] + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "writeMode": "insert", + "username": "usr", + "password": "pwd", + "column": [ + "id", + "name" + ], + "connection": [ + { + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db", + "table": [ + "target_table" + ] + } + ] + } + } + } + ], + "setting": { + "errorLimit": { + "percentage": 0, + "record": 0 + }, + "speed": { + "channel": 1, + "record": 1000 + } + } + } +} diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql new file mode 100644 index 0000000000..06b5c4c16c --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +select id from version where id=1; +select id from version where id=2; +select id from version where id=3; +select id from version where id=4; +select id from version where id=5; diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml new file mode 100644 index 0000000000..af3a863da9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "example_workflow_for_sub_workflow" + +# Define the tasks under the workflow +tasks: + - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" } + - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" } + - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" } diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml new file mode 100644 index 0000000000..104a8c367b --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "tutorial" + schedule: "0 0 0 * * ? *" + start_time: "2021-01-01" + tenant: "tenant_exists" + release_state: "offline" + run: true + +# Define the tasks under the workflow +tasks: + - name: task_parent + task_type: Shell + command: echo hello pydolphinscheduler + + - name: task_child_one + task_type: Shell + deps: [task_parent] + command: echo "child one" + + - name: task_child_two + task_type: Shell + deps: [task_parent] + command: echo "child two" + + - name: task_union + task_type: Shell + deps: [task_child_one, task_child_two] + command: echo "union" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py index d78e503dbb..8d923f1406 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py @@ -26,6 +26,7 @@ from pydolphinscheduler.configuration import ( init_config_file, set_single_config, ) +from pydolphinscheduler.core.yaml_process_define import create_process_definition version_option_val = ["major", "minor", "micro"] @@ -90,3 +91,16 @@ def config(getter, setter, init) -> None: for key, val in setter: set_single_config(key, val) click.echo("Set configuration done.") + + +@cli.command() +@click.option( + "--yaml_file", + "-f", + required=True, + help="YAML file path", + type=click.Path(exists=True), +) +def yaml(yaml_file) -> None: + """Create process definition using YAML file.""" + create_process_definition(yaml_file) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py new file mode 100644 index 0000000000..0944925a48 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py @@ -0,0 +1,466 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Parse YAML file to create process.""" + +import logging +import os +import re +from pathlib import Path +from typing import Any, Dict + +from pydolphinscheduler import configuration, tasks +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.exceptions import PyDSTaskNoFoundException +from pydolphinscheduler.utils.yaml_parser import YamlParser + +logger = logging.getLogger(__file__) + +KEY_PROCESS = "workflow" +KEY_TASK = "tasks" +KEY_TASK_TYPE = "task_type" +KEY_DEPS = "deps" +KEY_OP = "op" + +TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS] + + +class ParseTool: + """Enhanced parsing tools.""" + + @staticmethod + def parse_string_param_if_file(string_param: str, **kwargs): + """Use $FILE{"data_path"} to load file from "data_path".""" + if string_param.startswith("$FILE"): + path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0] + base_folder = kwargs.get("base_folder", ".") + path = ParseTool.get_possible_path(path, base_folder) + with open(path, "r") as read_file: + string_param = "".join(read_file) + return string_param + + @staticmethod + def parse_string_param_if_env(string_param: str, **kwargs): + """Use $ENV{env_name} to load environment variable "env_name".""" + if "$ENV" in string_param: + key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0] + env_value = os.environ.get(key, "$%s" % key) + string_param = string_param.replace("$ENV{%s}" % key, env_value) + return string_param + + @staticmethod + def parse_string_param_if_config(string_param: str, **kwargs): + """Use ${CONFIG.var_name} to load variable "var_name" from configuration.""" + if "${CONFIG" in string_param: + key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0] + if hasattr(configuration, key): + string_param = getattr(configuration, key) + else: + string_param = configuration.get_single_config(key) + + return string_param + + @staticmethod + def get_possible_path(file_path, base_folder): + """Get file possible path. + + Return new path if file_path is not exists, but base_folder + file_path exists + """ + possible_path = file_path + if not Path(file_path).exists(): + new_path = Path(base_folder).joinpath(file_path) + if new_path.exists(): + possible_path = new_path + logger.info(f"{file_path} not exists, convert to {possible_path}") + + return possible_path + + +def get_task_cls(task_type) -> Task: + """Get the task class object by task_type (case compatible).""" + # only get task class from tasks.__all__ + all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__} + task_type_cap = task_type.capitalize() + if task_type_cap not in all_task_types: + raise PyDSTaskNoFoundException("cant not find task %s" % task_type) + + standard_name = all_task_types[task_type_cap] + return getattr(tasks, standard_name) + + +class YamlProcess(YamlParser): + """Yaml parser for create process. + + :param yaml_file: yaml file path. + + examples1 :: + + parser = YamlParser(yaml_file=...) + parser.create_process_definition() + + examples2 :: + + YamlParser(yaml_file=...).create_process_definition() + + """ + + _parse_rules = [ + ParseTool.parse_string_param_if_file, + ParseTool.parse_string_param_if_env, + ParseTool.parse_string_param_if_config, + ] + + def __init__(self, yaml_file: str): + with open(yaml_file, "r") as f: + content = f.read() + + self._base_folder = Path(yaml_file).parent + content = self.prepare_refer_process(content) + super().__init__(content) + + def create_process_definition(self): + """Create process main function.""" + # get process parameters with key "workflow" + process_params = self[KEY_PROCESS] + + # pop "run" parameter, used at the end + is_run = process_params.pop("run", False) + + # use YamlProcess._parse_rules to parse special value of yaml file + process_params = self.parse_params(process_params) + + process_name = process_params["name"] + logger.info(f"Create Process: {process_name}") + with ProcessDefinition(**process_params) as pd: + + # save dependencies between tasks + dependencies = {} + + # save name and task mapping + name2task = {} + + # get task datas with key "tasks" + for task_data in self[KEY_TASK]: + task = self.parse_task(task_data, name2task) + + deps = task_data.get(KEY_DEPS, []) + if deps: + dependencies[task.name] = deps + name2task[task.name] = task + + # build dependencies between task + for downstream_task_name, deps in dependencies.items(): + downstream_task = name2task[downstream_task_name] + for upstream_task_name in deps: + upstream_task = name2task[upstream_task_name] + upstream_task >> downstream_task + + pd.submit() + # if set is_run, run the process after submit + if is_run: + logger.info(f"run workflow: {pd}") + pd.run() + + return process_name + + def parse_params(self, params: Any): + """Recursively resolves the parameter values. + + The function operates params only when it encounters a string; other types continue recursively. + """ + if isinstance(params, str): + for parse_rule in self._parse_rules: + params_ = params + params = parse_rule(params, base_folder=self._base_folder) + if params_ != params: + logger.info(f"parse {params_} -> {params}") + + elif isinstance(params, list): + for index in range(len(params)): + params[index] = self.parse_params(params[index]) + + elif isinstance(params, dict): + for key, value in params.items(): + params[key] = self.parse_params(value) + + return params + + @classmethod + def parse(cls, yaml_file: str): + """Recursively resolves the parameter values. + + The function operates params only when it encounters a string; other types continue recursively. + """ + process_name = cls(yaml_file).create_process_definition() + return process_name + + def prepare_refer_process(self, content): + """Allow YAML files to reference process derived from other YAML files.""" + process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content) + for process_path in process_paths: + logger.info( + f"find special token {process_path}, load process form {process_path}" + ) + possible_path = ParseTool.get_possible_path(process_path, self._base_folder) + process_name = YamlProcess.parse(possible_path) + content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name) + + return content + + def parse_task(self, task_data: dict, name2task: Dict[str, Task]): + """Parse various types of tasks. + + :param task_data: dict. + { + "task_type": "Shell", + "params": {"name": "shell_task", "command":"ehco hellp"} + } + + :param name2task: Dict[str, Task]), mapping of task_name and task + + + Some task type have special parse func: + if task type is Switch, use parse_switch; + if task type is Condition, use parse_condition; + if task type is Dependent, use parse_dependent; + other, we pass all task_params as input to task class, like "task_cls(**task_params)". + """ + task_type = task_data["task_type"] + # get params without special key + task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS} + + task_cls = get_task_cls(task_type) + + # use YamlProcess._parse_rules to parse special value of yaml file + task_params = self.parse_params(task_params) + + if task_cls == tasks.Switch: + task = self.parse_switch(task_params, name2task) + + elif task_cls == tasks.Condition: + task = self.parse_condition(task_params, name2task) + + elif task_cls == tasks.Dependent: + task = self.parse_dependent(task_params, name2task) + + else: + task = task_cls(**task_params) + logger.info(task_type, task) + return task + + def parse_switch(self, task_params, name2task): + """Parse Switch Task. + + This is an example Yaml fragment of task_params + + name: switch + condition: + - ["${var} > 1", switch_child_1] + - switch_child_2 + """ + from pydolphinscheduler.tasks.switch import ( + Branch, + Default, + Switch, + SwitchCondition, + ) + + condition_datas = task_params["condition"] + conditions = [] + for condition_data in condition_datas: + assert "task" in condition_data, "task must be in %s" % condition_data + task_name = condition_data["task"] + condition_string = condition_data.get("condition", None) + + # if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch + if condition_string is None: + conditions.append(Default(task=name2task.get(task_name))) + + # if condition_string is not None, for example: + # {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch + else: + conditions.append( + Branch(condition_string, task=name2task.get(task_name)) + ) + + switch = Switch( + name=task_params["name"], condition=SwitchCondition(*conditions) + ) + return switch + + def parse_condition(self, task_params, name2task): + """Parse Condition Task. + + This is an example Yaml fragment of task_params + + name: condition + success_task: success_branch + failed_task: fail_branch + OP: AND + groups: + - + OP: AND + groups: + - [pre_task_1, true] + - [pre_task_2, true] + - [pre_task_3, false] + - + OP: AND + groups: + - [pre_task_1, false] + - [pre_task_2, true] + - [pre_task_3, true] + + """ + from pydolphinscheduler.tasks.condition import ( + FAILURE, + SUCCESS, + And, + Condition, + Or, + ) + + def get_op_cls(op): + cls = None + if op.lower() == "and": + cls = And + elif op.lower() == "or": + cls = Or + else: + raise Exception("OP must be in And or Or, but get: %s" % op) + return cls + + second_cond_ops = [] + for first_group in task_params["groups"]: + second_op = first_group["op"] + task_ops = [] + for condition_data in first_group["groups"]: + assert "task" in condition_data, "task must be in %s" % condition_data + assert "flag" in condition_data, "flag must be in %s" % condition_data + task_name = condition_data["task"] + flag = condition_data["flag"] + task = name2task[task_name] + + # for example: task = pre_task_1, flag = true + if flag: + task_ops.append(SUCCESS(task)) + else: + task_ops.append(FAILURE(task)) + + second_cond_ops.append(get_op_cls(second_op)(*task_ops)) + + first_op = task_params["op"] + cond_operator = get_op_cls(first_op)(*second_cond_ops) + + condition = Condition( + name=task_params["name"], + condition=cond_operator, + success_task=name2task[task_params["success_task"]], + failed_task=name2task[task_params["failed_task"]], + ) + return condition + + def parse_dependent(self, task_params, name2task): + """Parse Dependent Task. + + This is an example Yaml fragment of task_params + + name: dependent + denpendence: + OP: AND + groups: + - + OP: Or + groups: + - [pydolphin, task_dependent_external, task_1] + - [pydolphin, task_dependent_external, task_2] + - + OP: And + groups: + - [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY] + - [pydolphin, task_dependent_external, task_2, last24Hours] + + """ + from pydolphinscheduler.tasks.dependent import ( + And, + Dependent, + DependentDate, + DependentItem, + Or, + ) + + def process_dependent_date(dependent_date): + """Parse dependent date (Compatible with key and value of DependentDate).""" + dependent_date_upper = dependent_date.upper() + if hasattr(DependentDate, dependent_date_upper): + dependent_date = getattr(DependentDate, dependent_date_upper) + return dependent_date + + def get_op_cls(op): + cls = None + if op.lower() == "and": + cls = And + elif op.lower() == "or": + cls = Or + else: + raise Exception("OP must be in And or Or, but get: %s" % op) + return cls + + def create_dependent_item(source_items): + """Parse dependent item. + + project_name: pydolphin + process_definition_name: task_dependent_external + dependent_task_name: task_1 + dependent_date: LAST_WEDNESDAY + """ + project_name = source_items["project_name"] + process_definition_name = source_items["process_definition_name"] + dependent_task_name = source_items["dependent_task_name"] + dependent_date = source_items.get("dependent_date", DependentDate.TODAY) + dependent_item = DependentItem( + project_name=project_name, + process_definition_name=process_definition_name, + dependent_task_name=dependent_task_name, + dependent_date=process_dependent_date(dependent_date), + ) + + return dependent_item + + second_dependences = [] + for first_group in task_params["groups"]: + second_op = first_group[KEY_OP] + dependence_items = [] + for source_items in first_group["groups"]: + dependence_items.append(create_dependent_item(source_items)) + + second_dependences.append(get_op_cls(second_op)(*dependence_items)) + + first_op = task_params[KEY_OP] + dependence = get_op_cls(first_op)(*second_dependences) + + task = Dependent( + name=task_params["name"], + dependence=dependence, + ) + return task + + +def create_process_definition(yaml_file): + """CLI.""" + YamlProcess.parse(yaml_file) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py index d49a1d394c..53b462ca90 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py @@ -35,6 +35,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi __all__ = [ "Condition", "DataX", + "CustomDataX", "Dependent", "Flink", "Http", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py new file mode 100644 index 0000000000..99ad179a5f --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test YAML process.""" + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest + +from pydolphinscheduler import configuration, tasks +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.core.yaml_process_define import ( + ParseTool, + create_process_definition, + get_task_cls, +) +from pydolphinscheduler.exceptions import PyDSTaskNoFoundException +from tests.testing.path import path_yaml_example +from tests.testing.task import Task + + +@pytest.mark.parametrize( + "string_param, expect", + [ + ("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"), + ], +) +def test_parse_tool_env_exist(string_param, expect): + """Test parsing the environment variable.""" + os.environ["PROJECT_NAME"] = expect + assert expect == ParseTool.parse_string_param_if_env(string_param) + + +def test_parse_tool_env_not_exist(): + """Test parsing the not exist environment variable.""" + key = "THIS_ENV_NOT_EXIST_0000000" + string_param = "$ENV{%s}" % key + expect = "$" + key + assert expect == ParseTool.parse_string_param_if_env(string_param) + + +@pytest.mark.parametrize( + "string_param, expect_key", + [ + ("${CONFIG.java_gateway.address}", "java_gateway.address"), + ("${CONFIG.WORKFLOW_PROJECT}", "default.workflow.project"), + ], +) +def test_parse_tool_config(string_param, expect_key): + """Test parsing configuration.""" + expect = configuration.get_single_config(expect_key) + assert expect == ParseTool.parse_string_param_if_config(string_param) + + +def test_parse_possible_yaml_file(): + """Test parsing possible path.""" + folder = Path(path_yaml_example) + file_name = "Shell.yaml" + path = folder.joinpath(file_name) + + with open(path, "r") as f: + expect = "".join(f) + + string_param = '$FILE{"%s"}' % file_name + content_ = ParseTool.parse_string_param_if_file(string_param, base_folder=folder) + + assert expect == content_ + + +def test_parse_tool_parse_possible_path_file(): + """Test parsing possible path.""" + folder = Path(path_yaml_example) + file_name = "Shell.yaml" + path = folder.joinpath(file_name) + + possible_path = ParseTool.get_possible_path(path, base_folder=folder) + assert path == possible_path + + possible_path = ParseTool.get_possible_path(file_name, base_folder=folder) + assert path == possible_path + + possible_path = ParseTool.get_possible_path(file_name, base_folder=".") + assert path != possible_path + + +@pytest.mark.parametrize( + "task_type, expect", + [ + ("shell", tasks.Shell), + ("Shell", tasks.Shell), + ("ShEll", tasks.Shell), + ("Condition", tasks.Condition), + ("DataX", tasks.DataX), + ("CustomDataX", tasks.CustomDataX), + ("Dependent", tasks.Dependent), + ("Flink", tasks.Flink), + ("Http", tasks.Http), + ("MR", tasks.MR), + ("Procedure", tasks.Procedure), + ("Python", tasks.Python), + ("Shell", tasks.Shell), + ("Spark", tasks.Spark), + ("Sql", tasks.Sql), + ("SubProcess", tasks.SubProcess), + ("Switch", tasks.Switch), + ("SageMaker", tasks.SageMaker), + ], +) +def test_get_task(task_type, expect): + """Test get task function.""" + assert expect == get_task_cls(task_type) + + +@pytest.mark.parametrize( + "task_type", + [ + ("MYSQL"), + ], +) +def test_get_error(task_type): + """Test get task cls error.""" + with pytest.raises( + PyDSTaskNoFoundException, + match=f"not find task {task_type}", + ): + get_task_cls(task_type) + + +@pytest.mark.parametrize( + "yaml_file", + [ + ("Condition.yaml"), + ("DataX.yaml"), + ("Dependent.yaml"), + ("Flink.yaml"), + ("Procedure.yaml"), + ("Http.yaml"), + ("MapReduce.yaml"), + ("Python.yaml"), + ("Shell.yaml"), + ("Spark.yaml"), + ("Sql.yaml"), + ("SubProcess.yaml"), + # ("Switch.yaml"), + ("MoreConfiguration.yaml"), + ], +) +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "test"}), +) +@patch( + "pydolphinscheduler.core.database.Database.get_database_info", + return_value=({"id": 1, "type": "mock_type"}), +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": 0, + "processDefinitionCode": 0, + "taskDefinitionCode": 0, + }, +) +@patch.object(ProcessDefinition, "run") +@patch.object(ProcessDefinition, "submit") +def test_get_create_process_definition( + prun, psubmit, dep_item, db_info, resource_info, yaml_file +): + """Test create_process_definition function to parse example YAML file.""" + yaml_file_path = Path(path_yaml_example).joinpath(yaml_file) + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, + ): + create_process_definition(yaml_file_path) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py index 68d93c404b..974ab3d47c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py @@ -24,6 +24,7 @@ project_root = Path(__file__).parent.parent.parent path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks") path_example = project_root.joinpath("src", "pydolphinscheduler", "examples") +path_yaml_example = project_root.joinpath("examples", "yaml_define") path_doc_tasks = project_root.joinpath("docs", "source", "tasks") path_default_config_yaml = project_root.joinpath( "src", "pydolphinscheduler", "default_config.yaml"