Browse Source

[feat][python] Support Pytorch task in python api (#11975)

Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
(cherry picked from commit 5202e5cfc6)
3.1.0-release
JieguangZhou 2 years ago committed by Jiajie Zhong
parent
commit
6f3b4c1624
  1. 1
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
  2. 42
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst
  3. 53
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml
  4. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  5. 62
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py
  6. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
  7. 95
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py
  8. 124
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py

1
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst

@ -42,3 +42,4 @@ In this section
sub_process sub_process
sagemaker sagemaker
pytorch

42
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/pytorch.rst

@ -0,0 +1,42 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Pytorch
=======
A Pytorch task type's example and dive into information of **PyDolphinScheduler**.
Example
-------
.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.pytorch
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/Pytorch.yaml
:start-after: # under the License.
:language: yaml

53
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Pytorch.yaml

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "Pytorch"
# Define the tasks under the workflow
tasks:
# run project with existing environment
- name: task_existing_env
task_type: pytorch
script: main.py
script_params: --dry-run --no-cuda
project_path: https://github.com/pytorch/examples#mnist
python_command: /home/anaconda3/envs/pytorch/bin/python3
# run project with creating conda environment
- name: task_conda_env
task_type: pytorch
script: main.py
script_params: --dry-run --no-cuda
project_path: https://github.com/pytorch/examples#mnist
is_create_environment: True
python_env_tool: conda
requirements: requirements.txt
conda_python_version: 3.7
# run project with creating virtualenv environment
- name: task_virtualenv_env
task_type: pytorch
script: main.py
script_params: --dry-run --no-cuda
project_path: https://github.com/pytorch/examples#mnist
is_create_environment: True
python_env_tool: virtualenv
requirements: requirements.txt

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -58,6 +58,7 @@ class TaskType(str):
SPARK = "SPARK" SPARK = "SPARK"
MR = "MR" MR = "MR"
SAGEMAKER = "SAGEMAKER" SAGEMAKER = "SAGEMAKER"
PYTORCH = "PYTORCH"
class DefaultTaskCodeNum(str): class DefaultTaskCodeNum(str):

62
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_pytorch_example.py

@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# [start workflow_declare]
"""A example workflow for task pytorch."""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.pytorch import Pytorch
with ProcessDefinition(
name="task_pytorch_example",
tenant="tenant_exists",
) as pd:
# run project with existing environment
task_existing_env = Pytorch(
name="task_existing_env",
script="main.py",
script_params="--dry-run --no-cuda",
project_path="https://github.com/pytorch/examples#mnist",
python_command="/home/anaconda3/envs/pytorch/bin/python3",
)
# run project with creating conda environment
task_conda_env = Pytorch(
name="task_conda_env",
script="main.py",
script_params="--dry-run --no-cuda",
project_path="https://github.com/pytorch/examples#mnist",
is_create_environment=True,
python_env_tool="conda",
requirements="requirements.txt",
conda_python_version="3.7",
)
# run project with creating virtualenv environment
task_virtualenv_env = Pytorch(
name="task_virtualenv_env",
script="main.py",
script_params="--dry-run --no-cuda",
project_path="https://github.com/pytorch/examples#mnist",
is_create_environment=True,
python_env_tool="virtualenv",
requirements="requirements.txt",
)
pd.submit()
# [end workflow_declare]

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py

@ -25,6 +25,7 @@ from pydolphinscheduler.tasks.http import Http
from pydolphinscheduler.tasks.map_reduce import MR from pydolphinscheduler.tasks.map_reduce import MR
from pydolphinscheduler.tasks.procedure import Procedure from pydolphinscheduler.tasks.procedure import Procedure
from pydolphinscheduler.tasks.python import Python from pydolphinscheduler.tasks.python import Python
from pydolphinscheduler.tasks.pytorch import Pytorch
from pydolphinscheduler.tasks.sagemaker import SageMaker from pydolphinscheduler.tasks.sagemaker import SageMaker
from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.shell import Shell
from pydolphinscheduler.tasks.spark import Spark from pydolphinscheduler.tasks.spark import Spark
@ -42,6 +43,7 @@ __all__ = [
"MR", "MR",
"Procedure", "Procedure",
"Python", "Python",
"Pytorch",
"Shell", "Shell",
"Spark", "Spark",
"Sql", "Sql",

95
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/pytorch.py

@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task Pytorch."""
from typing import Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
class DEFAULT:
"""Default values for Pytorch."""
is_create_environment = False
project_path = "."
python_command = "${PYTHON_HOME}"
class Pytorch(Task):
"""Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
See also: `DolphinScheduler Pytorch Task Plugin
<https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/pytorch.html>`_
:param name: task name
:param script: Entry to the Python script file that you want to run.
:param script_params: Input parameters at run time.
:param project_path: The path to the project. Default "." .
:param is_create_environment: is create environment. Default False.
:param python_command: The path to the python command. Default "${PYTHON_HOME}".
:param python_env_tool: The python environment tool. Default "conda".
:param requirements: The path to the requirements.txt file. Default "requirements.txt".
:param conda_python_version: The python version of conda environment. Default "3.7".
"""
_task_custom_attr = {
"script",
"script_params",
"other_params",
"python_path",
"is_create_environment",
"python_command",
"python_env_tool",
"requirements",
"conda_python_version",
}
def __init__(
self,
name: str,
script: str,
script_params: str = "",
project_path: Optional[str] = DEFAULT.project_path,
is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
python_command: Optional[str] = DEFAULT.python_command,
python_env_tool: Optional[str] = "conda",
requirements: Optional[str] = "requirements.txt",
conda_python_version: Optional[str] = "3.7",
*args,
**kwargs,
):
"""Init Pytorch task."""
super().__init__(name, TaskType.PYTORCH, *args, **kwargs)
self.script = script
self.script_params = script_params
self.is_create_environment = is_create_environment
self.python_path = project_path
self.python_command = python_command
self.python_env_tool = python_env_tool
self.requirements = requirements
self.conda_python_version = conda_python_version
@property
def other_params(self):
"""Return other params."""
conds = [
self.is_create_environment != DEFAULT.is_create_environment,
self.python_path != DEFAULT.project_path,
self.python_command != DEFAULT.python_command,
]
return any(conds)

124
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_pytorch.py

@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task Pytorch."""
from copy import deepcopy
from unittest.mock import patch
import pytest
from pydolphinscheduler.tasks.pytorch import DEFAULT, Pytorch
from tests.testing.task import Task
CODE = 123
VERSION = 1
EXPECT = {
"code": CODE,
"version": VERSION,
"description": None,
"delayTime": 0,
"taskType": "PYTORCH",
"taskParams": {
"resourceList": [],
"localParams": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
def test_pytorch_get_define():
"""Test task pytorch function get_define."""
name = "task_conda_env"
script = "main.py"
script_params = "--dry-run --no-cuda"
project_path = "https://github.com/pytorch/examples#mnist"
is_create_environment = True
python_env_tool = "conda"
requirements = "requirements.txt"
conda_python_version = "3.7"
expect = deepcopy(EXPECT)
expect["name"] = name
task_params = expect["taskParams"]
task_params["script"] = script
task_params["scriptParams"] = script_params
task_params["pythonPath"] = project_path
task_params["otherParams"] = True
task_params["isCreateEnvironment"] = is_create_environment
task_params["pythonCommand"] = "${PYTHON_HOME}"
task_params["pythonEnvTool"] = python_env_tool
task_params["requirements"] = requirements
task_params["condaPythonVersion"] = conda_python_version
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(CODE, VERSION),
):
task = Pytorch(
name=name,
script=script,
script_params=script_params,
project_path=project_path,
is_create_environment=is_create_environment,
python_env_tool=python_env_tool,
requirements=requirements,
)
assert task.get_define() == expect
@pytest.mark.parametrize(
"is_create_environment, project_path, python_command, expect",
[
(
DEFAULT.is_create_environment,
DEFAULT.project_path,
DEFAULT.python_command,
False,
),
(True, DEFAULT.project_path, DEFAULT.python_command, True),
(DEFAULT.is_create_environment, "/home", DEFAULT.python_command, True),
(DEFAULT.is_create_environment, DEFAULT.project_path, "/usr/bin/python", True),
],
)
def test_other_params(is_create_environment, project_path, python_command, expect):
"""Test task pytorch function other_params."""
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
):
task = Pytorch(
name="test",
script="",
script_params="",
project_path=project_path,
is_create_environment=is_create_environment,
python_command=python_command,
)
assert task.other_params == expect
Loading…
Cancel
Save