Browse Source

[fix][python] Task switch branch not show in webui (#12120)

when create task switch in python api, switch branch not show correctly,
due to we add some unnecessary attribute to switch task, this patch we
add `_task_ignore_attr` in class `Task` to ignore those unnecessary
attribute from `_task_default_attr`

(cherry picked from commit a86f4e2693)
3.1.0-release
Jiajie Zhong 2 years ago
parent
commit
b0b29ed8e1
  1. 33
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  2. 5
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
  3. 82
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  4. 2
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

33
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -16,7 +16,7 @@
# under the License.
"""DolphinScheduler Task and TaskRelation object."""
import copy
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@ -99,6 +99,17 @@ class Task(Base):
"timeout",
}
# task default attribute will into `task_params` property
_task_default_attr = {
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
}
# task attribute ignore from _task_default_attr and will not into `task_params` property
_task_ignore_attr: set = set()
# task custom attribute define in sub class and will append to `task_params` property
_task_custom_attr: set = set()
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
@ -213,20 +224,24 @@ class Task(Base):
"""Set attribute condition_result."""
self._condition_result = condition_result
def _get_attr(self) -> Set[str]:
"""Get final task task_params attribute.
Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from
`_task_ignore_attr`.
"""
attr = copy.deepcopy(self._task_default_attr)
attr -= self._task_ignore_attr
attr |= self._task_custom_attr
return attr
@property
def task_params(self) -> Optional[Dict]:
"""Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
"""
custom_attr = {
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
}
custom_attr |= self._task_custom_attr
custom_attr = self._get_attr()
return self.get_define_custom(custom_attr=custom_attr)
def __hash__(self):

5
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py

@ -134,6 +134,11 @@ class Switch(Task):
if task `switch` in this workflow.
"""
_task_ignore_attr = {
"condition_result",
"dependence",
}
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs)
self.condition = condition

82
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -19,18 +19,86 @@
import logging
import re
from unittest.mock import patch
from typing import Set
from unittest.mock import patch
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation
from tests.testing.task import Task as testTask
from tests.testing.task import Task as TestTask
from tests.testing.task import TaskWithCode
TEST_TASK_RELATION_SET = set()
TEST_TASK_RELATION_SIZE = 0
@pytest.mark.parametrize(
"addition, ignore, expect",
[
(
set(),
set(),
{
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
},
),
(
set(),
{"dependence", "condition_result", "not_exists"},
{
"local_params",
"resource_list",
"wait_start_timeout",
},
),
(
{
"not_exists_1",
"not_exists_2",
},
set(),
{
"not_exists_1",
"not_exists_2",
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
},
),
# test addition and ignore conflict to see behavior
(
{
"not_exists",
},
{"condition_result", "not_exists"},
{
"not_exists",
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
},
),
],
)
def test__get_attr(addition: Set, ignore: Set, expect: Set):
"""Test task function `_get_attr`."""
task = TestTask(
name="test-get-attr",
task_type="test",
)
task._task_custom_attr = addition
task._task_ignore_attr = ignore
assert task._get_attr() == expect
@pytest.mark.parametrize(
"attr, expect",
[
@ -72,7 +140,7 @@ TEST_TASK_RELATION_SIZE = 0
)
def test_property_task_params(mock_resource, mock_user_name, attr, expect):
"""Test class task property."""
task = testTask(
task = TestTask(
"test-property-task-params",
"test-task",
**attr,
@ -182,8 +250,8 @@ def test_two_tasks_shift(shift: str):
Here we test both `>>` and `<<` bit operator.
"""
upstream = testTask(name="upstream", task_type=shift)
downstream = testTask(name="downstream", task_type=shift)
upstream = TestTask(name="upstream", task_type=shift)
downstream = TestTask(name="downstream", task_type=shift)
if shift == "<<":
downstream << upstream
elif shift == ">>":
@ -219,10 +287,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
"downstream": "upstream",
}
task_type = "dep_task_and_tasks"
task = testTask(name="upstream", task_type=task_type)
task = TestTask(name="upstream", task_type=task_type)
tasks = [
testTask(name="downstream1", task_type=task_type),
testTask(name="downstream2", task_type=task_type),
TestTask(name="downstream1", task_type=task_type),
TestTask(name="downstream2", task_type=task_type),
]
# Use build-in function eval to simply test case and reduce duplicate code

2
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

@ -236,8 +236,6 @@ def test_switch_get_define(mock_task_code_version):
"taskParams": {
"resourceList": [],
"localParams": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
"switchResult": {
"dependTaskList": [

Loading…
Cancel
Save