From 32d4411469902705f31171d25494e9586560e220 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Fri, 5 Nov 2021 15:14:28 +0800 Subject: [PATCH] [python] Add flake8 and black for code style and integrated to GA (#6679) * [python] Add code style lint for GA * Change github action name * Auto change by black * Fix flake8 * Fix broken link for pyds README.md * Auto fix by black * Separate GitHub workflows * Add Black badge and CI locally in README.md --- .github/workflows/e2e.yml | 6 + .github/workflows/{py-tests.yml => py-ci.yml} | 33 ++++- .github/workflows/unit-test.yml | 2 + .../pydolphinscheduler/.flake8 | 37 +++++ .../pydolphinscheduler/README.md | 35 ++++- .../examples/bulk_create.py | 9 +- .../pydolphinscheduler/examples/tutorial.py | 5 + .../pydolphinscheduler/requirements_dev.txt | 7 +- .../pydolphinscheduler/setup.py | 10 +- .../src/pydolphinscheduler/__init__.py | 2 + .../src/pydolphinscheduler/constants.py | 25 +++- .../src/pydolphinscheduler/core/__init__.py | 2 + .../src/pydolphinscheduler/core/base.py | 27 ++-- .../src/pydolphinscheduler/core/base_side.py | 23 ++- .../core/process_definition.py | 137 ++++++++++++------ .../src/pydolphinscheduler/core/task.py | 128 ++++++++++------ .../src/pydolphinscheduler/java_gateway.py | 31 ++-- .../src/pydolphinscheduler/side/__init__.py | 2 + .../src/pydolphinscheduler/side/project.py | 21 ++- .../src/pydolphinscheduler/side/queue.py | 18 +-- .../src/pydolphinscheduler/side/tenant.py | 26 ++-- .../src/pydolphinscheduler/side/user.py | 30 ++-- .../pydolphinscheduler/side/worker_group.py | 13 +- .../src/pydolphinscheduler/tasks/__init__.py | 2 + .../src/pydolphinscheduler/tasks/shell.py | 18 ++- .../src/pydolphinscheduler/utils/__init__.py | 2 + .../src/pydolphinscheduler/utils/string.py | 6 + .../pydolphinscheduler/tests/__init__.py | 2 + .../pydolphinscheduler/tests/core/__init__.py | 2 + .../tests/core/test_process_definition.py | 71 ++++++--- .../tests/core/test_task.py | 36 +++-- .../tests/tasks/__init__.py | 2 + .../tests/tasks/test_shell.py | 21 ++- .../tests/test_java_gateway.py | 6 + .../tests/testing/__init__.py | 2 + .../pydolphinscheduler/tests/testing/task.py | 5 + 36 files changed, 534 insertions(+), 270 deletions(-) rename .github/workflows/{py-tests.yml => py-ci.yml} (69%) create mode 100644 dolphinscheduler-python/pydolphinscheduler/.flake8 diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index dfa916a0fc..244b0e5fb3 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -17,7 +17,13 @@ on: pull_request: + paths-ignore: + - '**/*.md' + - 'dolphinscheduler-python/pydolphinscheduler' push: + paths-ignore: + - '**/*.md' + - 'dolphinscheduler-python/pydolphinscheduler' branches: - dev diff --git a/.github/workflows/py-tests.yml b/.github/workflows/py-ci.yml similarity index 69% rename from .github/workflows/py-tests.yml rename to .github/workflows/py-ci.yml index e37cab3d85..5b8e42a272 100644 --- a/.github/workflows/py-tests.yml +++ b/.github/workflows/py-ci.yml @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -name: Python API Tests +name: Python API on: push: @@ -30,14 +30,41 @@ defaults: working-directory: dolphinscheduler-python/pydolphinscheduler jobs: - build: + sanity: + name: Sanity + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Sanity Check + uses: ./.github/actions/sanity-check + lint: + name: Code Style + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.7 + uses: actions/setup-python@v2 + with: + python-version: 3.7 + - name: Install Development Dependences + run: pip install -r requirements_dev.txt + - name: Run Black Checking + run: black --check . + - name: Run Flake8 Checking + run: flake8 + pytest: + name: Pytest + needs: + - lint + - sanity runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: python-version: [3.6, 3.7, 3.8, 3.9] os: [ubuntu-18.04, macOS-latest, windows-latest] - steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 3087806894..5a1056c781 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -22,10 +22,12 @@ on: paths-ignore: - '**/*.md' - 'dolphinscheduler-ui' + - 'dolphinscheduler-python/pydolphinscheduler' push: paths-ignore: - '**/*.md' - 'dolphinscheduler-ui' + - 'dolphinscheduler-python/pydolphinscheduler' branches: - dev diff --git a/dolphinscheduler-python/pydolphinscheduler/.flake8 b/dolphinscheduler-python/pydolphinscheduler/.flake8 new file mode 100644 index 0000000000..e676972c22 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/.flake8 @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[flake8] +max-line-length = 110 +exclude = + .git, + __pycache__, + .pytest_cache, + *.egg-info, + docs/source/conf.py + old, + build, + dist, + htmlcov +ignore = + # It's clear and not need to add docstring + D107, # D107: Don't require docstrings on __init__ + D105, # D105: Missing docstring in magic method + # Conflict to Black + W503 # W503: Line breaks before binary operators +per-file-ignores = + src/pydolphinscheduler/side/__init__.py:F401 diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md b/dolphinscheduler-python/pydolphinscheduler/README.md index a6609844e9..0cc36d76a3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/README.md +++ b/dolphinscheduler-python/pydolphinscheduler/README.md @@ -20,6 +20,7 @@ # pydolphinscheduler [![GitHub Build][ga-py-test]][ga] +[![Code style: black][black-shield]][black-gh] pydolphinscheduler is python API for Apache DolphinScheduler, which allow you definition your workflow by python code, aka workflow-as-codes. @@ -39,7 +40,7 @@ git clone git@github.com:apache/dolphinscheduler.git # Install pydolphinscheduler from source cd dolphinscheduler-python/pydolphinscheduler -pip setup.py install +pip install -e . ``` ### Start Server And Run Example @@ -77,6 +78,12 @@ We already clone the code in [quick start](#quick-start), so next step we have t in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ IDEA][idea] to open it. And you could just open directory `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`. +Then you should add developer dependence to make sure you could run test and check code style locally + +```shell +pip install -r requirements_dev.txt +``` + ### Brief Concept Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When @@ -95,6 +102,25 @@ pydolphinscheduler tasks object, we use tasks to define exact job we want Dolphi we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler and would be implemented in the further. +### Code Style + +We use [Black][black] for code formatter and [Flake8][flake8] for pep8 checker. If you use [pycharm][pycharm] +or [IntelliJ IDEA][idea], maybe you could follow [Black-integration][black-editor] to configure them in your environment. + +Our Python API CI would automatically run unittest when you submit pull request in GitHub, you could also run +static check locally. + +```shell +# We recommend you run Black before Flake8, because Black could auto fix some code style issue +# but Flake8 just hint when code style not match pep8 + +# Run Black +black . + +# Run Flake8 +flake8 +``` + ### Testing pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action will run our test when you create @@ -115,6 +141,11 @@ PYTHONPATH=src/ pytest [idea]: https://www.jetbrains.com/idea/ [all-task]: https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html [pytest]: https://docs.pytest.org/en/latest/ +[black]: https://black.readthedocs.io/en/stable/index.html +[flake8]: https://flake8.pycqa.org/en/latest/index.html +[black-editor]: https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea -[ga-py-test]: https://github.com/apache/dolphinscheduler/actions/workflows/py-tests.yml/badge.svg?branch=dev +[ga-py-test]: https://github.com/apache/dolphinscheduler/actions/workflows/py-ci.yml/badge.svg?branch=dev [ga]: https://github.com/apache/dolphinscheduler/actions +[black-shield]: https://img.shields.io/badge/code%20style-black-000000.svg +[black-gh]: https://github.com/psf/black diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py b/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py index 74203a17c2..72bdb02243 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py @@ -16,11 +16,14 @@ # under the License. """ -This example show you how to create workflows in batch mode. After this example run, we will create 10 -workflows named `workflow:`, and with 3 tasks named `task:-workflow:` -in each workflow. Each workflow is linear shape as below, since we set `IS_CHAIN=True` +This example show you how to create workflows in batch mode. + +After this example run, we will create 10 workflows named `workflow:`, and with 3 tasks +named `task:-workflow:` in each workflow. Task shape as below task:1-workflow:1 -> task:2-workflow:1 -> task:3-workflow:1 + +Each workflow is linear since we set `IS_CHAIN=True`, you could change task to parallel by set it to `False`. """ from pydolphinscheduler.core.process_definition import ProcessDefinition diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py index 775653448b..c223836549 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py @@ -16,6 +16,8 @@ # under the License. r""" +A tutorial example take you to experience pydolphinscheduler. + After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create, and workflow DAG graph as below: @@ -24,11 +26,14 @@ and workflow DAG graph as below: task_parent --> --> task_union \ / --> task_child_two + +it will instantiate and run all the task it have. """ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.shell import Shell + with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd: task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") task_child_one = Shell(name="task_child_one", command="echo 'child one'") diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt index be98ce9658..2c3d409f7b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt +++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt @@ -18,7 +18,6 @@ # testting pytest~=6.2.5 # code linting and formatting -flake8-black~=0.2.3 -# flake8 -# flake8-docstrings -# flake8-black +flake8 +flake8-docstrings +flake8-black diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py index 8e9cea44e9..4a6c045458 100644 --- a/dolphinscheduler-python/pydolphinscheduler/setup.py +++ b/dolphinscheduler-python/pydolphinscheduler/setup.py @@ -15,19 +15,23 @@ # specific language governing permissions and limitations # under the License. +"""The script for setting up pydolphinscheduler.""" import sys from os.path import dirname, join from setuptools import find_packages, setup -version = '0.0.1.dev0' +version = "0.0.1.dev0" if sys.version_info[0] < 3: - raise Exception("pydolphinscheduler does not support Python 2. Please upgrade to Python 3.") + raise Exception( + "pydolphinscheduler does not support Python 2. Please upgrade to Python 3." + ) def read(*names, **kwargs): + """Read file content from given file path.""" return open( join(dirname(__file__), *names), encoding=kwargs.get("encoding", "utf8") ).read() @@ -86,5 +90,5 @@ setup( "py4j~=0.10", # Dev "pytest~=6.2", - ] + ], ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py index 13a83393a9..701b4cccf1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init root of pydolphinscheduler.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index eda07aa3e6..bdf0d9cf7f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -15,18 +15,19 @@ # specific language governing permissions and limitations # under the License. +"""Constants for pydolphinscheduler.""" + + class ProcessDefinitionReleaseState: - """ - ProcessDefinition release state - """ + """Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state.""" + ONLINE: str = "ONLINE" OFFLINE: str = "OFFLINE" class ProcessDefinitionDefault: - """ - ProcessDefinition default values - """ + """Constants default value for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition`.""" + PROJECT: str = "project-pydolphin" TENANT: str = "tenant_pydolphin" USER: str = "userPythonGateway" @@ -40,6 +41,8 @@ class ProcessDefinitionDefault: class TaskPriority(str): + """Constants for task priority.""" + HIGHEST = "HIGHEST" HIGH = "HIGH" MEDIUM = "MEDIUM" @@ -48,23 +51,33 @@ class TaskPriority(str): class TaskFlag(str): + """Constants for task flag.""" + YES = "YES" NO = "NO" class TaskTimeoutFlag(str): + """Constants for task timeout flag.""" + CLOSE = "CLOSE" class TaskType(str): + """Constants for task type, it will also show you which kind we support up to now.""" + SHELL = "SHELL" class DefaultTaskCodeNum(str): + """Constants and default value for default task code number.""" + DEFAULT = 1 class JavaGatewayDefault(str): + """Constants and default value for java gateway.""" + RESULT_MESSAGE_KEYWORD = "msg" RESULT_MESSAGE_SUCCESS = "success" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py index 13a83393a9..3fbddf312b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init pydolphinscheduler.core package.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py index 175754fc65..ce71a4a064 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler Base object.""" + from typing import Optional, Dict # from pydolphinscheduler.side.user import User @@ -22,24 +24,15 @@ from pydolphinscheduler.utils.string import attr2camel class Base: - """ - Base - """ + """DolphinScheduler Base object.""" - _KEY_ATTR: set = { - "name", - "description" - } + _KEY_ATTR: set = {"name", "description"} _TO_DICT_ATTR: set = set() DEFAULT_ATTR: Dict = {} - def __init__( - self, - name: str, - description: Optional[str] = None - ): + def __init__(self, name: str, description: Optional[str] = None): self.name = name self.description = description @@ -47,12 +40,18 @@ class Base: return f'<{type(self).__name__}: name="{self.name}">' def __eq__(self, other): - return type(self) == type(other) and \ - all(getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR) + return type(self) == type(other) and all( + getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR + ) # TODO check how Redash do # TODO DRY def to_dict(self, camel_attr=True) -> Dict: + """Get object key attribute dict. + + use attribute `self._TO_DICT_ATTR` to determine which attributes should including to + children `to_dict` function. + """ # content = {} # for attr, value in self.__dict__.items(): # # Don't publish private variables diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py index cf0f14e4af..ed20d7075e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""Module for side object.""" + from typing import Optional from pydolphinscheduler.constants import ProcessDefinitionDefault @@ -22,22 +24,17 @@ from pydolphinscheduler.core.base import Base class BaseSide(Base): - def __init__( - self, - name: str, - description: Optional[str] = None - ): + """Base class for side object, it declare base behavior for them.""" + + def __init__(self, name: str, description: Optional[str] = None): super().__init__(name, description) @classmethod def create_if_not_exists( - cls, - # TODO comment for avoiding cycle import - # user: Optional[User] = ProcessDefinitionDefault.USER - user=ProcessDefinitionDefault.USER + cls, + # TODO comment for avoiding cycle import + # user: Optional[User] = ProcessDefinitionDefault.USER + user=ProcessDefinitionDefault.USER, ): - """ - Create Base if not exists - """ - + """Create Base if not exists.""" raise NotImplementedError diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index fa6ad97a9a..500f2d2380 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -14,36 +14,46 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Module process definition, core class for workflow define.""" + import json from typing import Optional, List, Dict, Set -from pydolphinscheduler.constants import ProcessDefinitionReleaseState, ProcessDefinitionDefault +from pydolphinscheduler.constants import ( + ProcessDefinitionReleaseState, + ProcessDefinitionDefault, +) from pydolphinscheduler.core.base import Base from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.side import Tenant, Project, User class ProcessDefinitionContext: + """Class process definition context, use when task get process definition from context expression.""" + _context_managed_process_definition: Optional["ProcessDefinition"] = None @classmethod def set(cls, pd: "ProcessDefinition") -> None: + """Set attribute self._context_managed_process_definition.""" cls._context_managed_process_definition = pd @classmethod def get(cls) -> Optional["ProcessDefinition"]: + """Get attribute self._context_managed_process_definition.""" return cls._context_managed_process_definition @classmethod def delete(cls) -> None: + """Delete attribute self._context_managed_process_definition.""" cls._context_managed_process_definition = None class ProcessDefinition(Base): - """ - ProcessDefinition - TODO :ref: comment may not correct ref - TODO: maybe we should rename this class, currently use DS object name + """process definition object, will define process definition attribute, task, relation. + + TODO: maybe we should rename this class, currently use DS object name. """ # key attribute for identify ProcessDefinition object @@ -70,17 +80,17 @@ class ProcessDefinition(Base): } def __init__( - self, - name: str, - description: Optional[str] = None, - user: Optional[str] = ProcessDefinitionDefault.USER, - project: Optional[str] = ProcessDefinitionDefault.PROJECT, - tenant: Optional[str] = ProcessDefinitionDefault.TENANT, - queue: Optional[str] = ProcessDefinitionDefault.QUEUE, - worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, - timeout: Optional[int] = 0, - release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, - param: Optional[List] = None + self, + name: str, + description: Optional[str] = None, + user: Optional[str] = ProcessDefinitionDefault.USER, + project: Optional[str] = ProcessDefinitionDefault.PROJECT, + tenant: Optional[str] = ProcessDefinitionDefault.TENANT, + queue: Optional[str] = ProcessDefinitionDefault.QUEUE, + worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, + timeout: Optional[int] = 0, + release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, + param: Optional[List] = None, ): super().__init__(name, description) self._user = user @@ -93,7 +103,7 @@ class ProcessDefinition(Base): self.param = param self.tasks: dict = {} # TODO how to fix circle import - self._task_relations: set["TaskRelation"] = set() + self._task_relations: set["TaskRelation"] = set() # noqa: F821 self._process_definition_code = None def __enter__(self) -> "ProcessDefinition": @@ -105,32 +115,43 @@ class ProcessDefinition(Base): @property def tenant(self) -> Tenant: + """Get attribute tenant.""" return Tenant(self._tenant) @tenant.setter def tenant(self, tenant: Tenant) -> None: + """Set attribute tenant.""" self._tenant = tenant.name @property def project(self) -> Project: + """Get attribute project.""" return Project(self._project) @project.setter def project(self, project: Project) -> None: + """Set attribute project.""" self._project = project.name @property def user(self) -> User: - return User(self._user, - ProcessDefinitionDefault.USER_PWD, - ProcessDefinitionDefault.USER_EMAIL, - ProcessDefinitionDefault.USER_PHONE, - ProcessDefinitionDefault.TENANT, - ProcessDefinitionDefault.QUEUE, - ProcessDefinitionDefault.USER_STATE) + """Get user object. + + For now we just get from python side but not from java gateway side, so it may not correct. + """ + return User( + self._user, + ProcessDefinitionDefault.USER_PWD, + ProcessDefinitionDefault.USER_EMAIL, + ProcessDefinitionDefault.USER_PHONE, + ProcessDefinitionDefault.TENANT, + ProcessDefinitionDefault.QUEUE, + ProcessDefinitionDefault.USER_STATE, + ) @property def task_definition_json(self) -> List[Dict]: + """Return all tasks definition in list of dict.""" if not self.tasks: return [self.tasks] else: @@ -138,26 +159,39 @@ class ProcessDefinition(Base): @property def task_relation_json(self) -> List[Dict]: + """Return all relation between tasks pair in list of dict.""" if not self.tasks: return [self.tasks] else: self._handle_root_relation() return [tr.to_dict() for tr in self._task_relations] - # TODO inti DAG's tasks are in the same place + # TODO inti DAG's tasks are in the same location with default {x: 0, y: 0} @property def task_location(self) -> List[Dict]: + """Return all tasks location for all process definition. + + For now, we only set all location with same x and y valued equal to 0. Because we do not + find a good way to set task locations. This is requests from java gateway interface. + """ if not self.tasks: return [self.tasks] else: return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in self.tasks] @property - def task_list(self) -> List["Task"]: + def task_list(self) -> List["Task"]: # noqa: F821 + """Return list of tasks objects.""" return list(self.tasks.values()) def _handle_root_relation(self): + """Handle root task property :class:`pydolphinscheduler.core.task.TaskRelation`. + + Root task in DAG do not have dominant upstream node, but we have to add an exactly default + upstream task with task_code equal to `0`. This is requests from java gateway interface. + """ from pydolphinscheduler.core.task import TaskRelation + post_relation_code = set() for relation in self._task_relations: post_relation_code.add(relation.post_task_code) @@ -166,46 +200,62 @@ class ProcessDefinition(Base): root_relation = TaskRelation(pre_task_code=0, post_task_code=task.code) self._task_relations.add(root_relation) - def add_task(self, task: "Task") -> None: + def add_task(self, task: "Task") -> None: # noqa: F821 + """Add a single task to process definition.""" self.tasks[task.code] = task task._process_definition = self - def add_tasks(self, tasks: List["Task"]) -> None: + def add_tasks(self, tasks: List["Task"]) -> None: # noqa: F821 + """Add task sequence to process definition, it a wrapper of :func:`add_task`.""" for task in tasks: self.add_task(task) - def get_task(self, code: str) -> "Task": + def get_task(self, code: str) -> "Task": # noqa: F821 + """Get task object from process definition by given code.""" if code not in self.tasks: - raise ValueError("Task with code %s can not found in process definition %", (code, self.name)) + raise ValueError( + "Task with code %s can not found in process definition %", + (code, self.name), + ) return self.tasks[code] # TODO which tying should return in this case - def get_tasks_by_name(self, name: str) -> Set["Task"]: + def get_tasks_by_name(self, name: str) -> Set["Task"]: # noqa: F821 + """Get tasks object by given name, if will return all tasks with this name.""" find = set() for task in self.tasks.values(): if task.name == name: find.add(task) return find - def get_one_task_by_name(self, name: str) -> "Task": + def get_one_task_by_name(self, name: str) -> "Task": # noqa: F821 + """Get exact one task from process definition by given name. + + Function always return one task even though this process definition have more than one task with + this name. + """ tasks = self.get_tasks_by_name(name) if not tasks: raise ValueError(f"Can not find task with name {name}.") return tasks.pop() def run(self): - """ - Run ProcessDefinition instance, a shortcut for :ref: submit and :ref: start - Only support manual for now, schedule run will coming soon + """Submit and Start ProcessDefinition instance. + + Shortcut for function :func:`submit` and function :func:`start`. Only support manual start workflow + for now, and schedule run will coming soon. :return: """ self.submit() self.start() def _ensure_side_model_exists(self): - """ - Ensure side model exists which including :ref: Project, Tenant, User. - If those model not exists, would create default value in :ref: ProcessDefinitionDefault + """Ensure process definition side model exists. + + For now, side object including :class:`pydolphinscheduler.side.project.Project`, + :class:`pydolphinscheduler.side.tenant.Tenant`, :class:`pydolphinscheduler.side.user.User`. + If these model not exists, would create default value in + :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`. """ # TODO used metaclass for more pythonic self.tenant.create_if_not_exists(self._queue) @@ -215,10 +265,7 @@ class ProcessDefinition(Base): self.project.create_if_not_exists(self._user) def submit(self) -> int: - """ - Submit ProcessDefinition instance to java gateway - :return: - """ + """Submit ProcessDefinition instance to java gateway.""" self._ensure_side_model_exists() gateway = launch_gateway() self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition( @@ -238,9 +285,9 @@ class ProcessDefinition(Base): return self._process_definition_code def start(self) -> None: - """ - Start ProcessDefinition instance which post to `start-process-instance` to java gateway - :return: + """Create and start ProcessDefinition instance. + + which post to `start-process-instance` to java gateway """ gateway = launch_gateway() gateway.entry_point.execProcessInstance( diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 51ad74bed6..6f9e454ef0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -15,18 +15,26 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler ObjectJsonBase, TaskParams and Task object.""" + from typing import Optional, List, Dict, Set, Union, Sequence, Tuple -from pydolphinscheduler.constants import TaskPriority, ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \ - DefaultTaskCodeNum, JavaGatewayDefault +from pydolphinscheduler.constants import ( + TaskPriority, + ProcessDefinitionDefault, + TaskFlag, + TaskTimeoutFlag, +) from pydolphinscheduler.core.base import Base from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinitionContext -from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker +from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.utils.string import snake2camel, class_name2camel class ObjectJsonBase: + """Task base class, define `__str__` and `to_dict` function would be use in other task related class.""" + DEFAULT_ATTR = {} def __int__(self, *args, **kwargs): @@ -35,36 +43,32 @@ class ObjectJsonBase: def __str__(self) -> str: content = [] for attribute, value in self.__dict__.items(): - content.append(f"\"{snake2camel(attribute)}\": {value}") + content.append(f'"{snake2camel(attribute)}": {value}') content = ",".join(content) - return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}" + return f'"{class_name2camel(type(self).__name__)}":{{{content}}}' # TODO check how Redash do # TODO DRY def to_dict(self) -> Dict: + """Get object key attribute dict which determine by attribute `DEFAULT_ATTR`.""" content = {snake2camel(attr): value for attr, value in self.__dict__.items()} content.update(self.DEFAULT_ATTR) return content class TaskParams(ObjectJsonBase): - DEFAULT_CONDITION_RESULT = { - "successNode": [ - "" - ], - "failedNode": [ - "" - ] - } + """TaskParams object, describe the key parameter of a single task.""" + + DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} def __init__( - self, - raw_script: str, - local_params: Optional[List] = None, - resource_list: Optional[List] = None, - dependence: Optional[Dict] = None, - wait_start_timeout: Optional[Dict] = None, - condition_result: Optional[Dict] = None, + self, + raw_script: str, + local_params: Optional[List] = None, + resource_list: Optional[List] = None, + dependence: Optional[Dict] = None, + wait_start_timeout: Optional[Dict] = None, + condition_result: Optional[Dict] = None, ): super().__init__() self.raw_script = raw_script @@ -77,18 +81,20 @@ class TaskParams(ObjectJsonBase): class TaskRelation(ObjectJsonBase): + """TaskRelation object, describe the relation of exactly two tasks.""" + DEFAULT_ATTR = { "name": "", "preTaskVersion": 1, "postTaskVersion": 1, "conditionType": 0, - "conditionParams": {} + "conditionParams": {}, } def __init__( - self, - pre_task_code: int, - post_task_code: int, + self, + pre_task_code: int, + post_task_code: int, ): super().__init__() self.pre_task_code = pre_task_code @@ -99,31 +105,32 @@ class TaskRelation(ObjectJsonBase): class Task(Base): + """Task object, parent class for all exactly task type.""" DEFAULT_DEPS_ATTR = { "name": "", "preTaskVersion": 1, "postTaskVersion": 1, "conditionType": 0, - "conditionParams": {} + "conditionParams": {}, } def __init__( - self, - name: str, - task_type: str, - task_params: TaskParams, - description: Optional[str] = None, - flag: Optional[str] = TaskFlag.YES, - task_priority: Optional[str] = TaskPriority.MEDIUM, - worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, - delay_time: Optional[int] = 0, - fail_retry_times: Optional[int] = 0, - fail_retry_interval: Optional[int] = 1, - timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE, - timeout_notify_strategy: Optional = None, - timeout: Optional[int] = 0, - process_definition: Optional[ProcessDefinition] = None, + self, + name: str, + task_type: str, + task_params: TaskParams, + description: Optional[str] = None, + flag: Optional[str] = TaskFlag.YES, + task_priority: Optional[str] = TaskPriority.MEDIUM, + worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, + delay_time: Optional[int] = 0, + fail_retry_times: Optional[int] = 0, + fail_retry_interval: Optional[int] = 1, + timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE, + timeout_notify_strategy: Optional = None, + timeout: Optional[int] = 0, + process_definition: Optional[ProcessDefinition] = None, ): super().__init__(name, description) @@ -139,48 +146,62 @@ class Task(Base): self.timeout_notify_strategy = timeout_notify_strategy self.timeout = timeout self._process_definition = None - self.process_definition: ProcessDefinition = process_definition or ProcessDefinitionContext.get() + self.process_definition: ProcessDefinition = ( + process_definition or ProcessDefinitionContext.get() + ) self._upstream_task_codes: Set[int] = set() self._downstream_task_codes: Set[int] = set() self._task_relation: Set[TaskRelation] = set() # move attribute code and version after _process_definition and process_definition declare self.code, self.version = self.gen_code_and_version() # Add task to process definition, maybe we could put into property process_definition latter - if self.process_definition is not None and self.code not in self.process_definition.tasks: + if ( + self.process_definition is not None + and self.code not in self.process_definition.tasks + ): self.process_definition.add_task(self) @property def process_definition(self) -> Optional[ProcessDefinition]: + """Get attribute process_definition.""" return self._process_definition @process_definition.setter def process_definition(self, process_definition: Optional[ProcessDefinition]): + """Set attribute process_definition.""" self._process_definition = process_definition def __hash__(self): return hash(self.code) def __lshift__(self, other: Union["Task", Sequence["Task"]]): - """Implements Task << Task""" + """Implement Task << Task.""" self.set_upstream(other) return other def __rshift__(self, other: Union["Task", Sequence["Task"]]): - """Implements Task >> Task""" + """Implement Task >> Task.""" self.set_downstream(other) return other def __rrshift__(self, other: Union["Task", Sequence["Task"]]): - """Called for Task >> [Task] because list don't have __rshift__ operators.""" + """Call for Task >> [Task] because list don't have __rshift__ operators.""" self.__lshift__(other) return self def __rlshift__(self, other: Union["Task", Sequence["Task"]]): - """Called for Task << [Task] because list don't have __lshift__ operators.""" + """Call for Task << [Task] because list don't have __lshift__ operators.""" self.__rshift__(other) return self - def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True) -> None: + def _set_deps( + self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True + ) -> None: + """ + Set parameter tasks dependent to current task. + + it is a wrapper for :func:`set_upstream` and :func:`set_downstream`. + """ if not isinstance(tasks, Sequence): tasks = [tasks] @@ -207,21 +228,32 @@ class Task(Base): self.process_definition._task_relations.add(task_relation) def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None: + """Set parameter tasks as upstream to current task.""" self._set_deps(tasks, upstream=True) def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None: + """Set parameter tasks as downstream to current task.""" self._set_deps(tasks, upstream=False) # TODO code should better generate in bulk mode when :ref: processDefinition run submit or start def gen_code_and_version(self) -> Tuple: + """ + Generate task code and version from java gateway. + + If task name do not exists in process definition before, if will generate new code and version id + equal to 0 by java gateway, otherwise if will return the exists code and version. + """ # TODO get code from specific project process definition and task name gateway = launch_gateway() - result = gateway.entry_point.getCodeAndVersion(self.process_definition._project, self.name) + result = gateway.entry_point.getCodeAndVersion( + self.process_definition._project, self.name + ) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") def to_dict(self, camel_attr=True) -> Dict: + """Task `to_dict` function which will return key attribute for Task object.""" content = {} for attr, value in self.__dict__.items(): # Don't publish private variables diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py index e93e8f1fb4..027ca94bc4 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""Module java gateway, contain gateway behavior.""" + from typing import Any, Optional from py4j.java_collections import JavaMap @@ -24,20 +26,29 @@ from pydolphinscheduler.constants import JavaGatewayDefault def launch_gateway() -> JavaGateway: - # TODO Note that automatic conversion makes calling Java methods slightly less efficient because - # in the worst case, Py4J needs to go through all registered converters for all parameters. - # This is why automatic conversion is disabled by default. + """Launch java gateway to pydolphinscheduler. + + TODO Note that automatic conversion makes calling Java methods slightly less efficient because + in the worst case, Py4J needs to go through all registered converters for all parameters. + This is why automatic conversion is disabled by default. + """ gateway = JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True)) return gateway def gateway_result_checker( - result: JavaMap, - msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS + result: JavaMap, + msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS, ) -> Any: - if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \ - JavaGatewayDefault.RESULT_STATUS_SUCCESS: - raise RuntimeError(f"Failed when try to got result for java gateway") - if msg_check is not None and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check: - raise ValueError(f"Get result state not success.") + """Check weather java gateway result success or not.""" + if ( + result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() + != JavaGatewayDefault.RESULT_STATUS_SUCCESS + ): + raise RuntimeError("Failed when try to got result for java gateway") + if ( + msg_check is not None + and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check + ): + raise ValueError("Get result state not success.") return result diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py index 2f376a5a01..de5188cc51 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""Init Side package, Side package keep object related to DolphinScheduler but not in the Core part.""" + from pydolphinscheduler.side.project import Project from pydolphinscheduler.side.tenant import Tenant from pydolphinscheduler.side.user import User diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py index b118be9994..96051a2941 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py @@ -15,31 +15,28 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler Project object.""" + from typing import Optional from pydolphinscheduler.core.base_side import BaseSide from pydolphinscheduler.constants import ProcessDefinitionDefault -from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker +from pydolphinscheduler.java_gateway import launch_gateway class Project(BaseSide): - """ - Project - """ + """DolphinScheduler Project object.""" def __init__( - self, - name: str = ProcessDefinitionDefault.PROJECT, - description: Optional[str] = None + self, + name: str = ProcessDefinitionDefault.PROJECT, + description: Optional[str] = None, ): super().__init__(name, description) def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None: - """ - Create Project if not exists - """ + """Create Project if not exists.""" gateway = launch_gateway() - result = gateway.entry_point.createProject(user, self.name, self.description) + gateway.entry_point.createProject(user, self.name, self.description) # TODO recover result checker # gateway_result_checker(result, None) - diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py index 4c0d1f6010..720135186c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler User object.""" + from typing import Optional from pydolphinscheduler.constants import ProcessDefinitionDefault @@ -23,22 +25,18 @@ from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_check class Queue(BaseSide): - """ - Queue - """ + """DolphinScheduler Queue object.""" def __init__( - self, - name: str = ProcessDefinitionDefault.QUEUE, - description: Optional[str] = "" + self, + name: str = ProcessDefinitionDefault.QUEUE, + description: Optional[str] = "", ): super().__init__(name, description) def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None: - """ - Create Queue if not exists - """ + """Create Queue if not exists.""" gateway = launch_gateway() # Here we set Queue.name and Queue.queueName same as self.name result = gateway.entry_point.createProject(user, self.name, self.name) - gateway_result_checker(result, None) \ No newline at end of file + gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py index 9cba5331a4..508c033102 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py @@ -15,31 +15,31 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler Tenant object.""" + from typing import Optional from pydolphinscheduler.constants import ProcessDefinitionDefault from pydolphinscheduler.core.base_side import BaseSide -from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker +from pydolphinscheduler.java_gateway import launch_gateway class Tenant(BaseSide): - """ - Tenant - """ + """DolphinScheduler Tenant object.""" def __init__( - self, - name: str = ProcessDefinitionDefault.TENANT, - queue: str = ProcessDefinitionDefault.QUEUE, - description: Optional[str] = None + self, + name: str = ProcessDefinitionDefault.TENANT, + queue: str = ProcessDefinitionDefault.QUEUE, + description: Optional[str] = None, ): super().__init__(name, description) self.queue = queue - def create_if_not_exists(self, queue_name: str, user=ProcessDefinitionDefault.USER) -> None: - """ - Create Tenant if not exists - """ + def create_if_not_exists( + self, queue_name: str, user=ProcessDefinitionDefault.USER + ) -> None: + """Create Tenant if not exists.""" gateway = launch_gateway() - result = gateway.entry_point.createTenant(self.name, self.description, queue_name) + gateway.entry_point.createTenant(self.name, self.description, queue_name) # gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py index fc7c3392ee..cd0145aea7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py @@ -15,13 +15,17 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler User object.""" + from typing import Optional from pydolphinscheduler.core.base_side import BaseSide -from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker +from pydolphinscheduler.java_gateway import launch_gateway class User(BaseSide): + """DolphinScheduler User object.""" + _KEY_ATTR = { "name", "password", @@ -33,14 +37,14 @@ class User(BaseSide): } def __init__( - self, - name: str, - password: str, - email: str, - phone: str, - tenant: str, - queue: Optional[str] = None, - status: Optional[int] = 1, + self, + name: str, + password: str, + email: str, + phone: str, + tenant: str, + queue: Optional[str] = None, + status: Optional[int] = 1, ): super().__init__(name) self.password = password @@ -51,18 +55,16 @@ class User(BaseSide): self.status = status def create_if_not_exists(self, **kwargs): - """ - Create User if not exists - """ + """Create User if not exists.""" gateway = launch_gateway() - result = gateway.entry_point.createUser( + gateway.entry_point.createUser( self.name, self.password, self.email, self.phone, self.tenant, self.queue, - self.status + self.status, ) # TODO recover result checker # gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py index d4b1bb46cf..ed50ec6630 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py @@ -15,21 +15,16 @@ # specific language governing permissions and limitations # under the License. +"""DolphinScheduler Worker Group object.""" + from typing import Optional from pydolphinscheduler.core.base_side import BaseSide class WorkerGroup(BaseSide): - """ - Worker Group - """ + """DolphinScheduler Worker Group object.""" - def __init__( - self, - name: str, - address: str, - description: Optional[str] = None - ): + def __init__(self, name: str, address: str, description: Optional[str] = None): super().__init__(name, description) self.address = address diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py index 13a83393a9..9eded9993f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init pydolphinscheduler.tasks package.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py index e60c78bfd5..825902fce3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py @@ -15,20 +15,22 @@ # specific language governing permissions and limitations # under the License. +"""Task shell.""" + from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task, TaskParams class Shell(Task): - # TODO maybe we could use instance name to replace attribute `name` - # which is simplify as `task_shell = Shell(command = "echo 1")` and - # task.name assign to `task_shell` + """Task shell object, declare behavior for shell task to dolphinscheduler. + + TODO maybe we could use instance name to replace attribute `name` + which is simplify as `task_shell = Shell(command = "echo 1")` and + task.name assign to `task_shell` + """ + def __init__( - self, - name: str, - command: str, - task_type: str = TaskType.SHELL, - *args, **kwargs + self, name: str, command: str, task_type: str = TaskType.SHELL, *args, **kwargs ): task_params = TaskParams(raw_script=command) super().__init__(name, task_type, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py index 13a83393a9..f8d3fbf62f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init utils package.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py index c3bab71293..3fb6a241bc 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py @@ -15,16 +15,22 @@ # specific language governing permissions and limitations # under the License. +"""String util function collections.""" + + def attr2camel(attr: str, include_private=True): + """Covert class attribute name to camel case.""" if include_private: attr = attr.lstrip("_") return snake2camel(attr) def snake2camel(snake: str): + """Covert snake case to camel case.""" components = snake.split("_") return components[0] + "".join(x.title() for x in components[1:]) def class_name2camel(class_name: str): + """Covert class name string to camel case.""" return class_name[0].lower() + class_name[1:] diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py index 13a83393a9..5ce1f82a1a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init tests package.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py index 13a83393a9..62ce0ea4ee 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init core package tests.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index f4b6b1c89c..96d0a88114 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -15,9 +15,14 @@ # specific language governing permissions and limitations # under the License. +"""Test process definition.""" + import pytest -from pydolphinscheduler.constants import ProcessDefinitionDefault, ProcessDefinitionReleaseState +from pydolphinscheduler.constants import ( + ProcessDefinitionDefault, + ProcessDefinitionReleaseState, +) from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import TaskParams from pydolphinscheduler.side import Tenant, Project, User @@ -26,15 +31,13 @@ from tests.testing.task import Task TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" -@pytest.mark.parametrize( - "func", - [ - "run", "submit", "start" - ] -) +@pytest.mark.parametrize("func", ["run", "submit", "start"]) def test_process_definition_key_attr(func): + """Test process definition have specific functions or attributes.""" with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: - assert hasattr(pd, func), f"ProcessDefinition instance don't have attribute `{func}`" + assert hasattr( + pd, func + ), f"ProcessDefinition instance don't have attribute `{func}`" @pytest.mark.parametrize( @@ -42,21 +45,29 @@ def test_process_definition_key_attr(func): [ ("project", Project(ProcessDefinitionDefault.PROJECT)), ("tenant", Tenant(ProcessDefinitionDefault.TENANT)), - ("user", User(ProcessDefinitionDefault.USER, - ProcessDefinitionDefault.USER_PWD, - ProcessDefinitionDefault.USER_EMAIL, - ProcessDefinitionDefault.USER_PHONE, - ProcessDefinitionDefault.TENANT, - ProcessDefinitionDefault.QUEUE, - ProcessDefinitionDefault.USER_STATE)), + ( + "user", + User( + ProcessDefinitionDefault.USER, + ProcessDefinitionDefault.USER_PWD, + ProcessDefinitionDefault.USER_EMAIL, + ProcessDefinitionDefault.USER_PHONE, + ProcessDefinitionDefault.TENANT, + ProcessDefinitionDefault.QUEUE, + ProcessDefinitionDefault.USER_STATE, + ), + ), ("worker_group", ProcessDefinitionDefault.WORKER_GROUP), ("release_state", ProcessDefinitionReleaseState.ONLINE), ], ) def test_process_definition_default_value(name, value): + """Test process definition default attributes.""" with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: - assert getattr(pd, name) == value, \ - f"ProcessDefinition instance attribute `{name}` have not except default value `{getattr(pd, name)}`" + assert getattr(pd, name) == value, ( + f"ProcessDefinition instance attribute `{name}` not with " + f"except default value `{getattr(pd, name)}`" + ) @pytest.mark.parametrize( @@ -68,13 +79,16 @@ def test_process_definition_default_value(name, value): ], ) def test_process_definition_set_attr(name, cls, expect): + """Test process definition set specific attributes.""" with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: setattr(pd, name, cls(expect)) assert getattr(pd, name) == cls( - expect), f"ProcessDefinition set attribute `{name}` do not work expect" + expect + ), f"ProcessDefinition set attribute `{name}` do not work expect" def test_process_definition_to_dict_without_task(): + """Test process definition function to_dict without task.""" expect = { "name": TEST_PROCESS_DEFINITION_NAME, "description": None, @@ -93,11 +107,14 @@ def test_process_definition_to_dict_without_task(): def test_process_definition_simple(): + """Test process definition simple create workflow, including process definition, task, relation define.""" expect_tasks_num = 5 with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: for i in range(expect_tasks_num): task_params = TaskParams(raw_script=f"test-raw-script-{i}") - curr_task = Task(name=f"task-{i}", task_type=f"type-{i}", task_params=task_params) + curr_task = Task( + name=f"task-{i}", task_type=f"type-{i}", task_params=task_params + ) # Set deps task i as i-1 parent if i > 0: pre_task = pd.get_one_task_by_name(f"task-{i - 1}") @@ -113,10 +130,18 @@ def test_process_definition_simple(): task: Task = pd.get_one_task_by_name(f"task-{i}") if i == 0: assert task._upstream_task_codes == set() - assert task._downstream_task_codes == {pd.get_one_task_by_name("task-1").code} + assert task._downstream_task_codes == { + pd.get_one_task_by_name("task-1").code + } elif i == expect_tasks_num - 1: - assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code} + assert task._upstream_task_codes == { + pd.get_one_task_by_name(f"task-{i - 1}").code + } assert task._downstream_task_codes == set() else: - assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code} - assert task._downstream_task_codes == {pd.get_one_task_by_name(f"task-{i + 1}").code} + assert task._upstream_task_codes == { + pd.get_one_task_by_name(f"task-{i - 1}").code + } + assert task._downstream_task_codes == { + pd.get_one_task_by_name(f"task-{i + 1}").code + } diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 6e0342806d..ef5d363b1b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +"""Test Task class function.""" from unittest.mock import patch @@ -22,6 +23,7 @@ from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task def test_task_params_to_dict(): + """Test TaskParams object function to_dict.""" raw_script = "test_task_params_to_dict" expect = { "resourceList": [], @@ -29,13 +31,14 @@ def test_task_params_to_dict(): "rawScript": raw_script, "dependence": {}, "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT, - "waitStartTimeout": {} + "waitStartTimeout": {}, } task_param = TaskParams(raw_script=raw_script) assert task_param.to_dict() == expect def test_task_relation_to_dict(): + """Test TaskRelation object function to_dict.""" pre_task_code = 123 post_task_code = 456 expect = { @@ -45,13 +48,16 @@ def test_task_relation_to_dict(): "preTaskVersion": 1, "postTaskVersion": 1, "conditionType": 0, - "conditionParams": {} + "conditionParams": {}, } - task_param = TaskRelation(pre_task_code=pre_task_code, post_task_code=post_task_code) + task_param = TaskRelation( + pre_task_code=pre_task_code, post_task_code=post_task_code + ) assert task_param.to_dict() == expect def test_task_to_dict(): + """Test Task object function to_dict.""" code = 123 version = 1 name = "test_task_to_dict" @@ -69,15 +75,8 @@ def test_task_to_dict(): "localParams": [], "rawScript": raw_script, "dependence": {}, - "conditionResult": { - "successNode": [ - "" - ], - "failedNode": [ - "" - ] - }, - "waitStartTimeout": {} + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, }, "flag": "YES", "taskPriority": "MEDIUM", @@ -86,12 +85,11 @@ def test_task_to_dict(): "failRetryInterval": 1, "timeoutFlag": "CLOSE", "timeoutNotifyStrategy": None, - "timeout": 0 + "timeout": 0, } - with patch('pydolphinscheduler.core.task.Task.gen_code_and_version', return_value=(code, version)): - task = Task( - name=name, - task_type=task_type, - task_params=TaskParams(raw_script) - ) + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = Task(name=name, task_type=task_type, task_params=TaskParams(raw_script)) assert task.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py index 13a83393a9..095e3013e5 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init tasks package tests.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py index 91cc431ffc..f5f5cfa373 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +"""Test Task shell.""" + from unittest.mock import patch @@ -22,6 +24,7 @@ from pydolphinscheduler.tasks.shell import Shell def test_shell_to_dict(): + """Test task shell function to_dict.""" code = 123 version = 1 name = "test_shell_to_dict" @@ -38,15 +41,8 @@ def test_shell_to_dict(): "localParams": [], "rawScript": command, "dependence": {}, - "conditionResult": { - "successNode": [ - "" - ], - "failedNode": [ - "" - ] - }, - "waitStartTimeout": {} + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, }, "flag": "YES", "taskPriority": "MEDIUM", @@ -55,8 +51,11 @@ def test_shell_to_dict(): "failRetryInterval": 1, "timeoutFlag": "CLOSE", "timeoutNotifyStrategy": None, - "timeout": 0 + "timeout": 0, } - with patch('pydolphinscheduler.core.task.Task.gen_code_and_version', return_value=(code, version)): + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): shell = Shell(name, command) assert shell.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py index 200c06d8b7..d0456a6af2 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py @@ -15,17 +15,21 @@ # specific language governing permissions and limitations # under the License. +"""Test pydolphinscheduler java gateway.""" + from py4j.java_gateway import java_import, JavaGateway def test_gateway_connect(): + """Test weather client could connect java gate way or not.""" gateway = JavaGateway() app = gateway.entry_point assert app.ping() == "PONG" def test_jvm_simple(): + """Test use JVM build-in object and operator from java gateway.""" gateway = JavaGateway() smaller = gateway.jvm.java.lang.Integer.MIN_VALUE bigger = gateway.jvm.java.lang.Integer.MAX_VALUE @@ -33,12 +37,14 @@ def test_jvm_simple(): def test_python_client_java_import_single(): + """Test import single class from java gateway.""" gateway = JavaGateway() java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.FileUtils") assert hasattr(gateway.jvm, "FileUtils") def test_python_client_java_import_package(): + """Test import package contain multiple class from java gateway.""" gateway = JavaGateway() java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*") # test if jvm view have some common utils diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py index 13a83393a9..c8caf5b5af 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py @@ -14,3 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +"""Init testing package, it provider easy way for pydolphinscheduler test.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py index 32d3ffaf73..e0affc9f85 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py @@ -15,13 +15,18 @@ # specific language governing permissions and limitations # under the License. +"""Mock class Task for other test.""" + import uuid from pydolphinscheduler.core.task import Task as SourceTask class Task(SourceTask): + """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest.""" + DEFAULT_VERSION = 1 def gen_code_and_version(self): + """Mock java gateway code and version, convenience method for unittest.""" return uuid.uuid1().time, self.DEFAULT_VERSION