Browse Source

[python] Add param workgroup to process definition (#6667)

3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
1165afbdd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  2. 3
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -60,6 +60,7 @@ class ProcessDefinition(Base):
"description", "description",
"_project", "_project",
"_tenant", "_tenant",
"worker_group",
"timeout", "timeout",
"release_state", "release_state",
"param", "param",
@ -76,6 +77,7 @@ class ProcessDefinition(Base):
project: Optional[str] = ProcessDefinitionDefault.PROJECT, project: Optional[str] = ProcessDefinitionDefault.PROJECT,
tenant: Optional[str] = ProcessDefinitionDefault.TENANT, tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
queue: Optional[str] = ProcessDefinitionDefault.QUEUE, queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
timeout: Optional[int] = 0, timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
param: Optional[List] = None param: Optional[List] = None
@ -85,6 +87,7 @@ class ProcessDefinition(Base):
self._project = project self._project = project
self._tenant = tenant self._tenant = tenant
self._queue = queue self._queue = queue
self.worker_group = worker_group
self.timeout = timeout self.timeout = timeout
self.release_state = release_state self.release_state = release_state
self.param = param self.param = param
@ -226,6 +229,7 @@ class ProcessDefinition(Base):
str(self.param) if self.param else None, str(self.param) if self.param else None,
json.dumps(self.task_location), json.dumps(self.task_location),
self.timeout, self.timeout,
self.worker_group,
self._tenant, self._tenant,
# TODO add serialization function # TODO add serialization function
json.dumps(self.task_relation_json), json.dumps(self.task_relation_json),
@ -244,6 +248,6 @@ class ProcessDefinition(Base):
self._project, self._project,
self.name, self.name,
"", "",
"default", self.worker_group,
24 * 3600, 24 * 3600,
) )

3
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -49,6 +49,7 @@ def test_process_definition_key_attr(func):
ProcessDefinitionDefault.TENANT, ProcessDefinitionDefault.TENANT,
ProcessDefinitionDefault.QUEUE, ProcessDefinitionDefault.QUEUE,
ProcessDefinitionDefault.USER_STATE)), ProcessDefinitionDefault.USER_STATE)),
("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
("release_state", ProcessDefinitionReleaseState.ONLINE), ("release_state", ProcessDefinitionReleaseState.ONLINE),
], ],
) )
@ -63,6 +64,7 @@ def test_process_definition_default_value(name, value):
[ [
("project", Project, "project"), ("project", Project, "project"),
("tenant", Tenant, "tenant"), ("tenant", Tenant, "tenant"),
("worker_group", str, "worker_group"),
], ],
) )
def test_process_definition_set_attr(name, cls, expect): def test_process_definition_set_attr(name, cls, expect):
@ -78,6 +80,7 @@ def test_process_definition_to_dict_without_task():
"description": None, "description": None,
"project": ProcessDefinitionDefault.PROJECT, "project": ProcessDefinitionDefault.PROJECT,
"tenant": ProcessDefinitionDefault.TENANT, "tenant": ProcessDefinitionDefault.TENANT,
"workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
"timeout": 0, "timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE, "releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None, "param": None,

Loading…
Cancel
Save