Browse Source

[fix][python] Task switch branch not show in webui (#12120)

when create task switch in python api, switch branch not show correctly,
due to we add some unnecessary attribute to switch task, this patch we
add `_task_ignore_attr` in class `Task` to ignore those unnecessary
attribute from `_task_default_attr`
3.2.0-release
Jiajie Zhong 2 years ago committed by GitHub
parent
commit
a86f4e2693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  2. 5
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
  3. 81
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  4. 2
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

33
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -16,7 +16,7 @@
# under the License. # under the License.
"""DolphinScheduler Task and TaskRelation object.""" """DolphinScheduler Task and TaskRelation object."""
import copy
from logging import getLogger from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@ -100,6 +100,17 @@ class Task(Base):
"timeout", "timeout",
} }
# task default attribute will into `task_params` property
_task_default_attr = {
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
}
# task attribute ignore from _task_default_attr and will not into `task_params` property
_task_ignore_attr: set = set()
# task custom attribute define in sub class and will append to `task_params` property
_task_custom_attr: set = set() _task_custom_attr: set = set()
ext: set = None ext: set = None
@ -220,20 +231,24 @@ class Task(Base):
"""Set attribute condition_result.""" """Set attribute condition_result."""
self._condition_result = condition_result self._condition_result = condition_result
def _get_attr(self) -> Set[str]:
"""Get final task task_params attribute.
Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from
`_task_ignore_attr`.
"""
attr = copy.deepcopy(self._task_default_attr)
attr -= self._task_ignore_attr
attr |= self._task_custom_attr
return attr
@property @property
def task_params(self) -> Optional[Dict]: def task_params(self) -> Optional[Dict]:
"""Get task parameter object. """Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr. Will get result to combine _task_custom_attr and custom_attr.
""" """
custom_attr = { custom_attr = self._get_attr()
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
}
custom_attr |= self._task_custom_attr
return self.get_define_custom(custom_attr=custom_attr) return self.get_define_custom(custom_attr=custom_attr)
def get_plugin(self): def get_plugin(self):

5
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py

@ -134,6 +134,11 @@ class Switch(Task):
if task `switch` in this workflow. if task `switch` in this workflow.
""" """
_task_ignore_attr = {
"condition_result",
"dependence",
}
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs): def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs) super().__init__(name, TaskType.SWITCH, *args, **kwargs)
self.condition = condition self.condition = condition

81
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -18,6 +18,7 @@
"""Test Task class function.""" """Test Task class function."""
import logging import logging
import re import re
from typing import Set
from unittest.mock import PropertyMock, patch from unittest.mock import PropertyMock, patch
import pytest import pytest
@ -26,13 +27,79 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation from pydolphinscheduler.core.task import Task, TaskRelation
from pydolphinscheduler.exceptions import PyResPluginException from pydolphinscheduler.exceptions import PyResPluginException
from pydolphinscheduler.resources_plugin import Local from pydolphinscheduler.resources_plugin import Local
from tests.testing.task import Task as testTask from tests.testing.task import Task as TestTask
from tests.testing.task import TaskWithCode from tests.testing.task import TaskWithCode
TEST_TASK_RELATION_SET = set() TEST_TASK_RELATION_SET = set()
TEST_TASK_RELATION_SIZE = 0 TEST_TASK_RELATION_SIZE = 0
@pytest.mark.parametrize(
"addition, ignore, expect",
[
(
set(),
set(),
{
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
},
),
(
set(),
{"dependence", "condition_result", "not_exists"},
{
"local_params",
"resource_list",
"wait_start_timeout",
},
),
(
{
"not_exists_1",
"not_exists_2",
},
set(),
{
"not_exists_1",
"not_exists_2",
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
},
),
# test addition and ignore conflict to see behavior
(
{
"not_exists",
},
{"condition_result", "not_exists"},
{
"not_exists",
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
},
),
],
)
def test__get_attr(addition: Set, ignore: Set, expect: Set):
"""Test task function `_get_attr`."""
task = TestTask(
name="test-get-attr",
task_type="test",
)
task._task_custom_attr = addition
task._task_ignore_attr = ignore
assert task._get_attr() == expect
@pytest.mark.parametrize( @pytest.mark.parametrize(
"attr, expect", "attr, expect",
[ [
@ -74,7 +141,7 @@ TEST_TASK_RELATION_SIZE = 0
) )
def test_property_task_params(mock_resource, mock_user_name, attr, expect): def test_property_task_params(mock_resource, mock_user_name, attr, expect):
"""Test class task property.""" """Test class task property."""
task = testTask( task = TestTask(
"test-property-task-params", "test-property-task-params",
"test-task", "test-task",
**attr, **attr,
@ -184,8 +251,8 @@ def test_two_tasks_shift(shift: str):
Here we test both `>>` and `<<` bit operator. Here we test both `>>` and `<<` bit operator.
""" """
upstream = testTask(name="upstream", task_type=shift) upstream = TestTask(name="upstream", task_type=shift)
downstream = testTask(name="downstream", task_type=shift) downstream = TestTask(name="downstream", task_type=shift)
if shift == "<<": if shift == "<<":
downstream << upstream downstream << upstream
elif shift == ">>": elif shift == ">>":
@ -221,10 +288,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
"downstream": "upstream", "downstream": "upstream",
} }
task_type = "dep_task_and_tasks" task_type = "dep_task_and_tasks"
task = testTask(name="upstream", task_type=task_type) task = TestTask(name="upstream", task_type=task_type)
tasks = [ tasks = [
testTask(name="downstream1", task_type=task_type), TestTask(name="downstream1", task_type=task_type),
testTask(name="downstream2", task_type=task_type), TestTask(name="downstream2", task_type=task_type),
] ]
# Use build-in function eval to simply test case and reduce duplicate code # Use build-in function eval to simply test case and reduce duplicate code

2
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

@ -236,8 +236,6 @@ def test_switch_get_define(mock_task_code_version):
"taskParams": { "taskParams": {
"resourceList": [], "resourceList": [],
"localParams": [], "localParams": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {}, "waitStartTimeout": {},
"switchResult": { "switchResult": {
"dependTaskList": [ "dependTaskList": [

Loading…
Cancel
Save