diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 93eb32352c..26b71c186d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -16,7 +16,7 @@ # under the License. """DolphinScheduler Task and TaskRelation object.""" - +import copy from logging import getLogger from typing import Dict, List, Optional, Sequence, Set, Tuple, Union @@ -100,6 +100,17 @@ class Task(Base): "timeout", } + # task default attribute will into `task_params` property + _task_default_attr = { + "local_params", + "resource_list", + "dependence", + "wait_start_timeout", + "condition_result", + } + # task attribute ignore from _task_default_attr and will not into `task_params` property + _task_ignore_attr: set = set() + # task custom attribute define in sub class and will append to `task_params` property _task_custom_attr: set = set() ext: set = None @@ -220,20 +231,24 @@ class Task(Base): """Set attribute condition_result.""" self._condition_result = condition_result + def _get_attr(self) -> Set[str]: + """Get final task task_params attribute. + + Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from + `_task_ignore_attr`. + """ + attr = copy.deepcopy(self._task_default_attr) + attr -= self._task_ignore_attr + attr |= self._task_custom_attr + return attr + @property def task_params(self) -> Optional[Dict]: """Get task parameter object. Will get result to combine _task_custom_attr and custom_attr. """ - custom_attr = { - "local_params", - "resource_list", - "dependence", - "wait_start_timeout", - "condition_result", - } - custom_attr |= self._task_custom_attr + custom_attr = self._get_attr() return self.get_define_custom(custom_attr=custom_attr) def get_plugin(self): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py index 35eece89b6..45edaa9aac 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py @@ -134,6 +134,11 @@ class Switch(Task): if task `switch` in this workflow. """ + _task_ignore_attr = { + "condition_result", + "dependence", + } + def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs): super().__init__(name, TaskType.SWITCH, *args, **kwargs) self.condition = condition diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index cb06c21ae4..c6ef7773ae 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -18,6 +18,7 @@ """Test Task class function.""" import logging import re +from typing import Set from unittest.mock import PropertyMock, patch import pytest @@ -26,13 +27,79 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import Task, TaskRelation from pydolphinscheduler.exceptions import PyResPluginException from pydolphinscheduler.resources_plugin import Local -from tests.testing.task import Task as testTask +from tests.testing.task import Task as TestTask from tests.testing.task import TaskWithCode TEST_TASK_RELATION_SET = set() TEST_TASK_RELATION_SIZE = 0 +@pytest.mark.parametrize( + "addition, ignore, expect", + [ + ( + set(), + set(), + { + "local_params", + "resource_list", + "dependence", + "wait_start_timeout", + "condition_result", + }, + ), + ( + set(), + {"dependence", "condition_result", "not_exists"}, + { + "local_params", + "resource_list", + "wait_start_timeout", + }, + ), + ( + { + "not_exists_1", + "not_exists_2", + }, + set(), + { + "not_exists_1", + "not_exists_2", + "local_params", + "resource_list", + "dependence", + "wait_start_timeout", + "condition_result", + }, + ), + # test addition and ignore conflict to see behavior + ( + { + "not_exists", + }, + {"condition_result", "not_exists"}, + { + "not_exists", + "local_params", + "resource_list", + "dependence", + "wait_start_timeout", + }, + ), + ], +) +def test__get_attr(addition: Set, ignore: Set, expect: Set): + """Test task function `_get_attr`.""" + task = TestTask( + name="test-get-attr", + task_type="test", + ) + task._task_custom_attr = addition + task._task_ignore_attr = ignore + assert task._get_attr() == expect + + @pytest.mark.parametrize( "attr, expect", [ @@ -74,7 +141,7 @@ TEST_TASK_RELATION_SIZE = 0 ) def test_property_task_params(mock_resource, mock_user_name, attr, expect): """Test class task property.""" - task = testTask( + task = TestTask( "test-property-task-params", "test-task", **attr, @@ -184,8 +251,8 @@ def test_two_tasks_shift(shift: str): Here we test both `>>` and `<<` bit operator. """ - upstream = testTask(name="upstream", task_type=shift) - downstream = testTask(name="downstream", task_type=shift) + upstream = TestTask(name="upstream", task_type=shift) + downstream = TestTask(name="downstream", task_type=shift) if shift == "<<": downstream << upstream elif shift == ">>": @@ -221,10 +288,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str): "downstream": "upstream", } task_type = "dep_task_and_tasks" - task = testTask(name="upstream", task_type=task_type) + task = TestTask(name="upstream", task_type=task_type) tasks = [ - testTask(name="downstream1", task_type=task_type), - testTask(name="downstream2", task_type=task_type), + TestTask(name="downstream1", task_type=task_type), + TestTask(name="downstream2", task_type=task_type), ] # Use build-in function eval to simply test case and reduce duplicate code diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py index 3206b12f7e..6f9222cec0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py @@ -236,8 +236,6 @@ def test_switch_get_define(mock_task_code_version): "taskParams": { "resourceList": [], "localParams": [], - "dependence": {}, - "conditionResult": {"successNode": [""], "failedNode": [""]}, "waitStartTimeout": {}, "switchResult": { "dependTaskList": [