From 80ebe4a33431c88c7514b3ad0d9ffdf645e1bba2 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Tue, 24 May 2022 10:25:19 +0800 Subject: [PATCH] [python] Fix process definition attr release state not work (#10151) This patch fix the error release state not work when it set to offline and submit it to Java gateway, it error because we do not pass the attribute to Java gateway function `createOrUpdateProcessDefinition` close: #9779 (cherry picked from commit 56e0ea802d93667de8c48796a9291a775abddd49) --- .../api/python/PythonGateway.java | 3 +- .../pydolphinscheduler/docs/source/config.rst | 68 ++++++++++--------- .../src/pydolphinscheduler/constants.py | 7 -- .../pydolphinscheduler/core/configuration.py | 3 + .../core/default_config.yaml | 6 ++ .../core/process_definition.py | 26 ++++++- .../tests/core/test_process_definition.py | 41 +++++++++-- .../tests/utils/test_yaml_parser.py | 9 +-- 8 files changed, 111 insertions(+), 52 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 4fb74f1e66..817f411854 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -221,6 +221,7 @@ public class PythonGateway { int timeout, String workerGroup, String tenantCode, + int releaseState, String taskRelationJson, String taskDefinitionJson, ProcessExecutionTypeEnum executionType) { @@ -248,7 +249,7 @@ public class PythonGateway { if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); } - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.getEnum(releaseState)); return processDefinitionCode; } diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst index b7879dd94e..2b804d0c62 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst @@ -78,39 +78,41 @@ All Configurations in Environment Variables All environment variables as below, and you could modify their value via `Bash `_ or `Python OS Module `_ -+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| Variable Section | Variable Name | description | -+==================+====================================+==================================================================================================================+ -| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. | -+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. | -+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | -+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | -+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ ++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| Variable Section | Variable Name | description | ++==================+====================================+====================================================================================================================+ +| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. | ++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. | ++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | ++ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | ++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ .. note:: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 3992917310..262469c88f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -18,13 +18,6 @@ """Constants for pydolphinscheduler.""" -class ProcessDefinitionReleaseState: - """Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state.""" - - ONLINE: str = "ONLINE" - OFFLINE: str = "OFFLINE" - - class TaskPriority(str): """Constants for task priority.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py index 03ac0977e4..860f9869f3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py @@ -177,6 +177,9 @@ WORKFLOW_USER = os.environ.get( WORKFLOW_QUEUE = os.environ.get( "PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue") ) +WORKFLOW_RELEASE_STATE = os.environ.get( + "PYDS_WORKFLOW_RELEASE_STATE", configs.get("default.workflow.release_state") +) WORKFLOW_WORKER_GROUP = os.environ.get( "PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group") ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml index e437e55c31..5541af7b79 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml @@ -48,5 +48,11 @@ default: user: userPythonGateway queue: queuePythonGateway worker_group: default + # Release state of workflow, default value is ``online`` which mean setting workflow online when it submits + # to Java gateway, if you want to set workflow offline set its value to ``offline`` + release_state: online time_zone: Asia/Shanghai + # Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state, + # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are + # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL`` warning_type: NONE diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index dbf2c41795..cef01706df 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -21,7 +21,7 @@ import json from datetime import datetime from typing import Any, Dict, List, Optional, Set -from pydolphinscheduler.constants import ProcessDefinitionReleaseState, TaskType +from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core import configuration from pydolphinscheduler.core.base import Base from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException @@ -105,7 +105,7 @@ class ProcessDefinition(Base): warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, warning_group_id: Optional[int] = 0, timeout: Optional[int] = 0, - release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, + release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE, param: Optional[Dict] = None, ): super().__init__(name, description) @@ -126,7 +126,7 @@ class ProcessDefinition(Base): self.warning_type = warning_type.strip().upper() self.warning_group_id = warning_group_id self.timeout = timeout - self.release_state = release_state + self._release_state = release_state self.param = param self.tasks: dict = {} # TODO how to fix circle import @@ -197,6 +197,25 @@ class ProcessDefinition(Base): """Set attribute end_time.""" self._end_time = val + @property + def release_state(self) -> int: + """Get attribute release_state.""" + rs_ref = { + "online": 1, + "offline": 0, + } + if self._release_state not in rs_ref: + raise PyDSParamException( + "Parameter release_state only support `online` or `offline` but get %", + self._release_state, + ) + return rs_ref[self._release_state] + + @release_state.setter + def release_state(self, val: str) -> None: + """Set attribute release_state.""" + self._release_state = val.lower() + @property def param_json(self) -> Optional[List[Dict]]: """Return param json base on self.param.""" @@ -381,6 +400,7 @@ class ProcessDefinition(Base): self.timeout, self.worker_group, self._tenant, + self.release_state, # TODO add serialization function json.dumps(self.task_relation_json), json.dumps(self.task_definition_json), diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 36e1cb035e..63580de467 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -24,7 +24,6 @@ from unittest.mock import patch import pytest from freezegun import freeze_time -from pydolphinscheduler.constants import ProcessDefinitionReleaseState from pydolphinscheduler.core import configuration from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.exceptions import PyDSParamException @@ -67,7 +66,7 @@ def test_process_definition_key_attr(func): ("worker_group", configuration.WORKFLOW_WORKER_GROUP), ("warning_type", configuration.WORKFLOW_WARNING_TYPE), ("warning_group_id", 0), - ("release_state", ProcessDefinitionReleaseState.ONLINE), + ("release_state", 1), ], ) def test_process_definition_default_value(name, value): @@ -90,7 +89,6 @@ def test_process_definition_default_value(name, value): ("warning_type", str, "FAILURE"), ("warning_group_id", int, 1), ("timeout", int, 1), - ("release_state", str, "OFFLINE"), ("param", dict, {"key": "value"}), ], ) @@ -103,6 +101,41 @@ def test_set_attr(name, cls, expect): ), f"ProcessDefinition set attribute `{name}` do not work expect" +@pytest.mark.parametrize( + "value,expect", + [ + ("online", 1), + ("offline", 0), + ], +) +def test_set_release_state(value, expect): + """Test process definition set release_state attributes.""" + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd: + assert ( + getattr(pd, "release_state") == expect + ), "ProcessDefinition set attribute release_state do not return expect value." + + +@pytest.mark.parametrize( + "value", + [ + "oneline", + "offeline", + 1, + 0, + None, + ], +) +def test_set_release_state_error(value): + """Test process definition set release_state attributes with error.""" + pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) + with pytest.raises( + PyDSParamException, + match="Parameter release_state only support `online` or `offline` but get.*", + ): + pd.release_state + + @pytest.mark.parametrize( "set_attr,set_val,get_attr,get_val", [ @@ -283,7 +316,7 @@ def test_process_definition_get_define_without_task(): "warningType": configuration.WORKFLOW_WARNING_TYPE, "warningGroupId": 0, "timeout": 0, - "releaseState": ProcessDefinitionReleaseState.ONLINE, + "releaseState": 1, "param": None, "tasks": {}, "taskDefinitionJson": [{}], diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py index 4b1b05ed33..ad3aaf7bd1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py @@ -58,10 +58,11 @@ expects = [ "default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"), "default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"), "default.workflow.user": ("userPythonGateway", "SmithEdit"), - "default.workflow.queue": ("queuePythonGateway", "SmithEdit"), - "default.workflow.worker_group": ("default", "SmithEdit"), - "default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"), - "default.workflow.warning_type": ("NONE", "SmithEdit"), + "default.workflow.queue": ("queuePythonGateway", "queueEdit"), + "default.workflow.worker_group": ("default", "wgEdit"), + "default.workflow.release_state": ("online", "offline"), + "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"), + "default.workflow.warning_type": ("NONE", "SUCCESS"), }, ]