diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 940c74920f..84becbcb4c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -104,6 +104,7 @@ class Delimiter(str): DASH = "/" COLON = ":" UNDERSCORE = "_" + DIRECTION = "->" class Time(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 39cca9c0b9..8a90efcd70 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -21,6 +21,7 @@ import logging from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from pydolphinscheduler.constants import ( + Delimiter, ProcessDefinitionDefault, TaskFlag, TaskPriority, @@ -37,6 +38,13 @@ from pydolphinscheduler.java_gateway import launch_gateway class TaskRelation(Base): """TaskRelation object, describe the relation of exactly two tasks.""" + # Add attr `_KEY_ATTR` to overwrite :func:`__eq__`, it is make set + # `Task.process_definition._task_relations` work correctly. + _KEY_ATTR = { + "pre_task_code", + "post_task_code", + } + _DEFINE_ATTR = { "pre_task_code", "post_task_code", @@ -61,7 +69,7 @@ class TaskRelation(Base): self.post_task_code = post_task_code def __hash__(self): - return hash(f"{self.post_task_code}, {self.post_task_code}") + return hash(f"{self.pre_task_code} {Delimiter.DIRECTION} {self.post_task_code}") class Task(Base): @@ -219,6 +227,7 @@ class Task(Base): task_relation = TaskRelation( pre_task_code=task.code, post_task_code=self.code, + name=f"{task.name} {Delimiter.DIRECTION} {self.name}", ) self.process_definition._task_relations.add(task_relation) else: @@ -229,6 +238,7 @@ class Task(Base): task_relation = TaskRelation( pre_task_code=self.code, post_task_code=task.code, + name=f"{self.name} {Delimiter.DIRECTION} {task.name}", ) self.process_definition._task_relations.add(task_relation) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index b551f072b1..6af731b5ff 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -24,6 +24,9 @@ import pytest from pydolphinscheduler.core.task import Task, TaskRelation from tests.testing.task import Task as testTask +TEST_TASK_RELATION_SET = set() +TEST_TASK_RELATION_SIZE = 0 + @pytest.mark.parametrize( "attr, expect", @@ -66,6 +69,45 @@ def test_property_task_params(attr, expect): assert expect == task.task_params +@pytest.mark.parametrize( + "pre_code, post_code, expect", + [ + (123, 456, hash("123 -> 456")), + (12345678, 987654321, hash("12345678 -> 987654321")), + ], +) +def test_task_relation_hash_func(pre_code, post_code, expect): + """Test TaskRelation magic function :func:`__hash__`.""" + task_param = TaskRelation(pre_task_code=pre_code, post_task_code=post_code) + assert hash(task_param) == expect + + +@pytest.mark.parametrize( + "pre_code, post_code, size_add", + [ + (123, 456, 1), + (123, 456, 0), + (456, 456, 1), + (123, 123, 1), + (456, 123, 1), + (0, 456, 1), + (123, 0, 1), + ], +) +def test_task_relation_add_to_set(pre_code, post_code, size_add): + """Test TaskRelation with different pre_code and post_code add to set behavior. + + Here we use global variable to keep set of :class:`TaskRelation` instance and the number we expect + of the size when we add a new task relation to exists set. + """ + task_relation = TaskRelation(pre_task_code=pre_code, post_task_code=post_code) + TEST_TASK_RELATION_SET.add(task_relation) + # hint python interpreter use global variable instead of local's + global TEST_TASK_RELATION_SIZE + TEST_TASK_RELATION_SIZE += size_add + assert len(TEST_TASK_RELATION_SET) == TEST_TASK_RELATION_SIZE + + def test_task_relation_to_dict(): """Test TaskRelation object function to_dict.""" pre_task_code = 123 @@ -79,10 +121,10 @@ def test_task_relation_to_dict(): "conditionType": 0, "conditionParams": {}, } - task_param = TaskRelation( + task_relation = TaskRelation( pre_task_code=pre_task_code, post_task_code=post_task_code ) - assert task_param.get_define() == expect + assert task_relation.get_define() == expect def test_task_get_define():