Browse Source

[python] Fix process definition attr release state not work (#10151)

This patch fix the error release state not work when it set to
offline and submit it to Java gateway, it error because we do
not pass the attribute to Java gateway function
`createOrUpdateProcessDefinition`

close: #9779
3.1.0-release
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
56e0ea802d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 68
      dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
  3. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  4. 3
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
  5. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
  6. 26
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  7. 41
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  8. 9
      dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -221,6 +221,7 @@ public class PythonGateway {
int timeout,
String workerGroup,
String tenantCode,
int releaseState,
String taskRelationJson,
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
@ -248,7 +249,7 @@ public class PythonGateway {
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.getEnum(releaseState));
return processDefinitionCode;
}

68
dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst

@ -78,39 +78,41 @@ All Configurations in Environment Variables
All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
| Variable Section | Variable Name | description |
+==================+====================================+==================================================================================================================+
| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Variable Section | Variable Name | description |
+==================+====================================+====================================================================================================================+
| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
.. note::

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -18,13 +18,6 @@
"""Constants for pydolphinscheduler."""
class ProcessDefinitionReleaseState:
"""Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state."""
ONLINE: str = "ONLINE"
OFFLINE: str = "OFFLINE"
class TaskPriority(str):
"""Constants for task priority."""

3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py

@ -177,6 +177,9 @@ WORKFLOW_USER = os.environ.get(
WORKFLOW_QUEUE = os.environ.get(
"PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue")
)
WORKFLOW_RELEASE_STATE = os.environ.get(
"PYDS_WORKFLOW_RELEASE_STATE", configs.get("default.workflow.release_state")
)
WORKFLOW_WORKER_GROUP = os.environ.get(
"PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group")
)

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml

@ -48,5 +48,11 @@ default:
user: userPythonGateway
queue: queuePythonGateway
worker_group: default
# Release state of workflow, default value is ``online`` which mean setting workflow online when it submits
# to Java gateway, if you want to set workflow offline set its value to ``offline``
release_state: online
time_zone: Asia/Shanghai
# Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state,
# change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
# ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
warning_type: NONE

26
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -21,7 +21,7 @@ import json
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler.constants import ProcessDefinitionReleaseState, TaskType
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
@ -105,7 +105,7 @@ class ProcessDefinition(Base):
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
):
super().__init__(name, description)
@ -126,7 +126,7 @@ class ProcessDefinition(Base):
self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id
self.timeout = timeout
self.release_state = release_state
self._release_state = release_state
self.param = param
self.tasks: dict = {}
# TODO how to fix circle import
@ -197,6 +197,25 @@ class ProcessDefinition(Base):
"""Set attribute end_time."""
self._end_time = val
@property
def release_state(self) -> int:
"""Get attribute release_state."""
rs_ref = {
"online": 1,
"offline": 0,
}
if self._release_state not in rs_ref:
raise PyDSParamException(
"Parameter release_state only support `online` or `offline` but get %",
self._release_state,
)
return rs_ref[self._release_state]
@release_state.setter
def release_state(self, val: str) -> None:
"""Set attribute release_state."""
self._release_state = val.lower()
@property
def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param."""
@ -381,6 +400,7 @@ class ProcessDefinition(Base):
self.timeout,
self.worker_group,
self._tenant,
self.release_state,
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),

41
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -24,7 +24,6 @@ from unittest.mock import patch
import pytest
from freezegun import freeze_time
from pydolphinscheduler.constants import ProcessDefinitionReleaseState
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
@ -67,7 +66,7 @@ def test_process_definition_key_attr(func):
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("warning_type", configuration.WORKFLOW_WARNING_TYPE),
("warning_group_id", 0),
("release_state", ProcessDefinitionReleaseState.ONLINE),
("release_state", 1),
],
)
def test_process_definition_default_value(name, value):
@ -90,7 +89,6 @@ def test_process_definition_default_value(name, value):
("warning_type", str, "FAILURE"),
("warning_group_id", int, 1),
("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
],
)
@ -103,6 +101,41 @@ def test_set_attr(name, cls, expect):
), f"ProcessDefinition set attribute `{name}` do not work expect"
@pytest.mark.parametrize(
"value,expect",
[
("online", 1),
("offline", 0),
],
)
def test_set_release_state(value, expect):
"""Test process definition set release_state attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
assert (
getattr(pd, "release_state") == expect
), "ProcessDefinition set attribute release_state do not return expect value."
@pytest.mark.parametrize(
"value",
[
"oneline",
"offeline",
1,
0,
None,
],
)
def test_set_release_state_error(value):
"""Test process definition set release_state attributes with error."""
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
with pytest.raises(
PyDSParamException,
match="Parameter release_state only support `online` or `offline` but get.*",
):
pd.release_state
@pytest.mark.parametrize(
"set_attr,set_val,get_attr,get_val",
[
@ -283,7 +316,7 @@ def test_process_definition_get_define_without_task():
"warningType": configuration.WORKFLOW_WARNING_TYPE,
"warningGroupId": 0,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"releaseState": 1,
"param": None,
"tasks": {},
"taskDefinitionJson": [{}],

9
dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

@ -58,10 +58,11 @@ expects = [
"default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"),
"default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"),
"default.workflow.user": ("userPythonGateway", "SmithEdit"),
"default.workflow.queue": ("queuePythonGateway", "SmithEdit"),
"default.workflow.worker_group": ("default", "SmithEdit"),
"default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"),
"default.workflow.warning_type": ("NONE", "SmithEdit"),
"default.workflow.queue": ("queuePythonGateway", "queueEdit"),
"default.workflow.worker_group": ("default", "wgEdit"),
"default.workflow.release_state": ("online", "offline"),
"default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
"default.workflow.warning_type": ("NONE", "SUCCESS"),
},
]

Loading…
Cancel
Save