diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst index 30173f838b..a13652a526 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst @@ -42,3 +42,4 @@ In this section sub_process sagemaker + pytorch diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst new file mode 100644 index 0000000000..4c7a5521fb --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst @@ -0,0 +1,42 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Pytorch +======= + + +A Pytorch task type's example and dive into information of **PyDolphinScheduler**. + +Example +------- + +.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py + :start-after: [start workflow_declare] + :end-before: [end workflow_declare] + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.tasks.pytorch + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/Pytorch.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml new file mode 100644 index 0000000000..8706824245 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "Pytorch" + +# Define the tasks under the workflow +tasks: + + # run project with existing environment + - name: task_existing_env + task_type: pytorch + script: main.py + script_params: --dry-run --no-cuda + project_path: https://github.com/pytorch/examples#mnist + python_command: /home/anaconda3/envs/pytorch/bin/python3 + + + # run project with creating conda environment + - name: task_conda_env + task_type: pytorch + script: main.py + script_params: --dry-run --no-cuda + project_path: https://github.com/pytorch/examples#mnist + is_create_environment: True + python_env_tool: conda + requirements: requirements.txt + conda_python_version: 3.7 + + # run project with creating virtualenv environment + - name: task_virtualenv_env + task_type: pytorch + script: main.py + script_params: --dry-run --no-cuda + project_path: https://github.com/pytorch/examples#mnist + is_create_environment: True + python_env_tool: virtualenv + requirements: requirements.txt diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 4544a6989d..7eb5d04210 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -58,6 +58,7 @@ class TaskType(str): SPARK = "SPARK" MR = "MR" SAGEMAKER = "SAGEMAKER" + PYTORCH = "PYTORCH" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py new file mode 100644 index 0000000000..6559c9ac65 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# [start workflow_declare] +"""A example workflow for task pytorch.""" + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.pytorch import Pytorch + +with ProcessDefinition( + name="task_pytorch_example", + tenant="tenant_exists", +) as pd: + + # run project with existing environment + task_existing_env = Pytorch( + name="task_existing_env", + script="main.py", + script_params="--dry-run --no-cuda", + project_path="https://github.com/pytorch/examples#mnist", + python_command="/home/anaconda3/envs/pytorch/bin/python3", + ) + + # run project with creating conda environment + task_conda_env = Pytorch( + name="task_conda_env", + script="main.py", + script_params="--dry-run --no-cuda", + project_path="https://github.com/pytorch/examples#mnist", + is_create_environment=True, + python_env_tool="conda", + requirements="requirements.txt", + conda_python_version="3.7", + ) + + # run project with creating virtualenv environment + task_virtualenv_env = Pytorch( + name="task_virtualenv_env", + script="main.py", + script_params="--dry-run --no-cuda", + project_path="https://github.com/pytorch/examples#mnist", + is_create_environment=True, + python_env_tool="virtualenv", + requirements="requirements.txt", + ) + + pd.submit() +# [end workflow_declare] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py index 53b462ca90..1481722433 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py @@ -25,6 +25,7 @@ from pydolphinscheduler.tasks.http import Http from pydolphinscheduler.tasks.map_reduce import MR from pydolphinscheduler.tasks.procedure import Procedure from pydolphinscheduler.tasks.python import Python +from pydolphinscheduler.tasks.pytorch import Pytorch from pydolphinscheduler.tasks.sagemaker import SageMaker from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.spark import Spark @@ -42,6 +43,7 @@ __all__ = [ "MR", "Procedure", "Python", + "Pytorch", "Shell", "Spark", "Sql", diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py new file mode 100644 index 0000000000..4767f7ecee --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task Pytorch.""" +from typing import Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task + + +class DEFAULT: + """Default values for Pytorch.""" + + is_create_environment = False + project_path = "." + python_command = "${PYTHON_HOME}" + + +class Pytorch(Task): + """Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler. + + See also: `DolphinScheduler Pytorch Task Plugin + `_ + + :param name: task name + :param script: Entry to the Python script file that you want to run. + :param script_params: Input parameters at run time. + :param project_path: The path to the project. Default "." . + :param is_create_environment: is create environment. Default False. + :param python_command: The path to the python command. Default "${PYTHON_HOME}". + :param python_env_tool: The python environment tool. Default "conda". + :param requirements: The path to the requirements.txt file. Default "requirements.txt". + :param conda_python_version: The python version of conda environment. Default "3.7". + """ + + _task_custom_attr = { + "script", + "script_params", + "other_params", + "python_path", + "is_create_environment", + "python_command", + "python_env_tool", + "requirements", + "conda_python_version", + } + + def __init__( + self, + name: str, + script: str, + script_params: str = "", + project_path: Optional[str] = DEFAULT.project_path, + is_create_environment: Optional[bool] = DEFAULT.is_create_environment, + python_command: Optional[str] = DEFAULT.python_command, + python_env_tool: Optional[str] = "conda", + requirements: Optional[str] = "requirements.txt", + conda_python_version: Optional[str] = "3.7", + *args, + **kwargs, + ): + """Init Pytorch task.""" + super().__init__(name, TaskType.PYTORCH, *args, **kwargs) + self.script = script + self.script_params = script_params + self.is_create_environment = is_create_environment + self.python_path = project_path + self.python_command = python_command + self.python_env_tool = python_env_tool + self.requirements = requirements + self.conda_python_version = conda_python_version + + @property + def other_params(self): + """Return other params.""" + conds = [ + self.is_create_environment != DEFAULT.is_create_environment, + self.python_path != DEFAULT.project_path, + self.python_command != DEFAULT.python_command, + ] + return any(conds) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py new file mode 100644 index 0000000000..eccb51ca31 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Pytorch.""" +from copy import deepcopy +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.pytorch import DEFAULT, Pytorch +from tests.testing.task import Task + +CODE = 123 +VERSION = 1 + +EXPECT = { + "code": CODE, + "version": VERSION, + "description": None, + "delayTime": 0, + "taskType": "PYTORCH", + "taskParams": { + "resourceList": [], + "localParams": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "environmentCode": None, + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, +} + + +def test_pytorch_get_define(): + """Test task pytorch function get_define.""" + name = "task_conda_env" + script = "main.py" + script_params = "--dry-run --no-cuda" + project_path = "https://github.com/pytorch/examples#mnist" + is_create_environment = True + python_env_tool = "conda" + requirements = "requirements.txt" + conda_python_version = "3.7" + + expect = deepcopy(EXPECT) + expect["name"] = name + task_params = expect["taskParams"] + + task_params["script"] = script + task_params["scriptParams"] = script_params + task_params["pythonPath"] = project_path + task_params["otherParams"] = True + task_params["isCreateEnvironment"] = is_create_environment + task_params["pythonCommand"] = "${PYTHON_HOME}" + task_params["pythonEnvTool"] = python_env_tool + task_params["requirements"] = requirements + task_params["condaPythonVersion"] = conda_python_version + + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(CODE, VERSION), + ): + task = Pytorch( + name=name, + script=script, + script_params=script_params, + project_path=project_path, + is_create_environment=is_create_environment, + python_env_tool=python_env_tool, + requirements=requirements, + ) + assert task.get_define() == expect + + +@pytest.mark.parametrize( + "is_create_environment, project_path, python_command, expect", + [ + ( + DEFAULT.is_create_environment, + DEFAULT.project_path, + DEFAULT.python_command, + False, + ), + (True, DEFAULT.project_path, DEFAULT.python_command, True), + (DEFAULT.is_create_environment, "/home", DEFAULT.python_command, True), + (DEFAULT.is_create_environment, DEFAULT.project_path, "/usr/bin/python", True), + ], +) +def test_other_params(is_create_environment, project_path, python_command, expect): + """Test task pytorch function other_params.""" + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, + ): + task = Pytorch( + name="test", + script="", + script_params="", + project_path=project_path, + is_create_environment=is_create_environment, + python_command=python_command, + ) + assert task.other_params == expect