Browse Source

[python] Fix process definition attr release state not work (#10151)

This patch fix the error release state not work when it set to
offline and submit it to Java gateway, it error because we do
not pass the attribute to Java gateway function
`createOrUpdateProcessDefinition`

close: #9779
(cherry picked from commit 56e0ea802d)
3.0.0/version-upgrade
Jiajie Zhong 2 years ago committed by Jiajie Zhong
parent
commit
80ebe4a334
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 68
      dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
  3. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  4. 3
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
  5. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
  6. 26
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  7. 41
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  8. 9
      dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -221,6 +221,7 @@ public class PythonGateway {
int timeout, int timeout,
String workerGroup, String workerGroup,
String tenantCode, String tenantCode,
int releaseState,
String taskRelationJson, String taskRelationJson,
String taskDefinitionJson, String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) { ProcessExecutionTypeEnum executionType) {
@ -248,7 +249,7 @@ public class PythonGateway {
if (schedule != null) { if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
} }
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.getEnum(releaseState));
return processDefinitionCode; return processDefinitionCode;
} }

68
dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst

@ -78,39 +78,41 @@ All Configurations in Environment Variables
All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_ All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ +------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Variable Section | Variable Name | description | | Variable Section | Variable Name | description |
+==================+====================================+==================================================================================================================+ +==================+====================================+====================================================================================================================+
| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. | | | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. | | Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. | | | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ +------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. | | | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. | | | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. | | Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. | | | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. | | | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ +------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | | | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | | | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | | Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | | | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | | | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | | | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | | | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ + +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
.. note:: .. note::

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -18,13 +18,6 @@
"""Constants for pydolphinscheduler.""" """Constants for pydolphinscheduler."""
class ProcessDefinitionReleaseState:
"""Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state."""
ONLINE: str = "ONLINE"
OFFLINE: str = "OFFLINE"
class TaskPriority(str): class TaskPriority(str):
"""Constants for task priority.""" """Constants for task priority."""

3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py

@ -177,6 +177,9 @@ WORKFLOW_USER = os.environ.get(
WORKFLOW_QUEUE = os.environ.get( WORKFLOW_QUEUE = os.environ.get(
"PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue") "PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue")
) )
WORKFLOW_RELEASE_STATE = os.environ.get(
"PYDS_WORKFLOW_RELEASE_STATE", configs.get("default.workflow.release_state")
)
WORKFLOW_WORKER_GROUP = os.environ.get( WORKFLOW_WORKER_GROUP = os.environ.get(
"PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group") "PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group")
) )

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml

@ -48,5 +48,11 @@ default:
user: userPythonGateway user: userPythonGateway
queue: queuePythonGateway queue: queuePythonGateway
worker_group: default worker_group: default
# Release state of workflow, default value is ``online`` which mean setting workflow online when it submits
# to Java gateway, if you want to set workflow offline set its value to ``offline``
release_state: online
time_zone: Asia/Shanghai time_zone: Asia/Shanghai
# Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state,
# change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
# ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
warning_type: NONE warning_type: NONE

26
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -21,7 +21,7 @@ import json
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Set from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler.constants import ProcessDefinitionReleaseState, TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core import configuration from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base from pydolphinscheduler.core.base import Base
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
@ -105,7 +105,7 @@ class ProcessDefinition(Base):
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0, warning_group_id: Optional[int] = 0,
timeout: Optional[int] = 0, timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None, param: Optional[Dict] = None,
): ):
super().__init__(name, description) super().__init__(name, description)
@ -126,7 +126,7 @@ class ProcessDefinition(Base):
self.warning_type = warning_type.strip().upper() self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id self.warning_group_id = warning_group_id
self.timeout = timeout self.timeout = timeout
self.release_state = release_state self._release_state = release_state
self.param = param self.param = param
self.tasks: dict = {} self.tasks: dict = {}
# TODO how to fix circle import # TODO how to fix circle import
@ -197,6 +197,25 @@ class ProcessDefinition(Base):
"""Set attribute end_time.""" """Set attribute end_time."""
self._end_time = val self._end_time = val
@property
def release_state(self) -> int:
"""Get attribute release_state."""
rs_ref = {
"online": 1,
"offline": 0,
}
if self._release_state not in rs_ref:
raise PyDSParamException(
"Parameter release_state only support `online` or `offline` but get %",
self._release_state,
)
return rs_ref[self._release_state]
@release_state.setter
def release_state(self, val: str) -> None:
"""Set attribute release_state."""
self._release_state = val.lower()
@property @property
def param_json(self) -> Optional[List[Dict]]: def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param.""" """Return param json base on self.param."""
@ -381,6 +400,7 @@ class ProcessDefinition(Base):
self.timeout, self.timeout,
self.worker_group, self.worker_group,
self._tenant, self._tenant,
self.release_state,
# TODO add serialization function # TODO add serialization function
json.dumps(self.task_relation_json), json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json), json.dumps(self.task_definition_json),

41
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -24,7 +24,6 @@ from unittest.mock import patch
import pytest import pytest
from freezegun import freeze_time from freezegun import freeze_time
from pydolphinscheduler.constants import ProcessDefinitionReleaseState
from pydolphinscheduler.core import configuration from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
@ -67,7 +66,7 @@ def test_process_definition_key_attr(func):
("worker_group", configuration.WORKFLOW_WORKER_GROUP), ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("warning_type", configuration.WORKFLOW_WARNING_TYPE), ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
("warning_group_id", 0), ("warning_group_id", 0),
("release_state", ProcessDefinitionReleaseState.ONLINE), ("release_state", 1),
], ],
) )
def test_process_definition_default_value(name, value): def test_process_definition_default_value(name, value):
@ -90,7 +89,6 @@ def test_process_definition_default_value(name, value):
("warning_type", str, "FAILURE"), ("warning_type", str, "FAILURE"),
("warning_group_id", int, 1), ("warning_group_id", int, 1),
("timeout", int, 1), ("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}), ("param", dict, {"key": "value"}),
], ],
) )
@ -103,6 +101,41 @@ def test_set_attr(name, cls, expect):
), f"ProcessDefinition set attribute `{name}` do not work expect" ), f"ProcessDefinition set attribute `{name}` do not work expect"
@pytest.mark.parametrize(
"value,expect",
[
("online", 1),
("offline", 0),
],
)
def test_set_release_state(value, expect):
"""Test process definition set release_state attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
assert (
getattr(pd, "release_state") == expect
), "ProcessDefinition set attribute release_state do not return expect value."
@pytest.mark.parametrize(
"value",
[
"oneline",
"offeline",
1,
0,
None,
],
)
def test_set_release_state_error(value):
"""Test process definition set release_state attributes with error."""
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
with pytest.raises(
PyDSParamException,
match="Parameter release_state only support `online` or `offline` but get.*",
):
pd.release_state
@pytest.mark.parametrize( @pytest.mark.parametrize(
"set_attr,set_val,get_attr,get_val", "set_attr,set_val,get_attr,get_val",
[ [
@ -283,7 +316,7 @@ def test_process_definition_get_define_without_task():
"warningType": configuration.WORKFLOW_WARNING_TYPE, "warningType": configuration.WORKFLOW_WARNING_TYPE,
"warningGroupId": 0, "warningGroupId": 0,
"timeout": 0, "timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE, "releaseState": 1,
"param": None, "param": None,
"tasks": {}, "tasks": {},
"taskDefinitionJson": [{}], "taskDefinitionJson": [{}],

9
dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

@ -58,10 +58,11 @@ expects = [
"default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"), "default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"),
"default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"), "default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"),
"default.workflow.user": ("userPythonGateway", "SmithEdit"), "default.workflow.user": ("userPythonGateway", "SmithEdit"),
"default.workflow.queue": ("queuePythonGateway", "SmithEdit"), "default.workflow.queue": ("queuePythonGateway", "queueEdit"),
"default.workflow.worker_group": ("default", "SmithEdit"), "default.workflow.worker_group": ("default", "wgEdit"),
"default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"), "default.workflow.release_state": ("online", "offline"),
"default.workflow.warning_type": ("NONE", "SmithEdit"), "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
"default.workflow.warning_type": ("NONE", "SUCCESS"),
}, },
] ]

Loading…
Cancel
Save