From 1417967d9eebd8e2a5cd4f3b47449cb5be1bdb5d Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Wed, 5 Jan 2022 19:58:44 +0800 Subject: [PATCH] [python] Task condition missing two downstream param (#7783) * [python] Task condition missing two downstream param We add two downstream tasks to set task condition success and failed node close: #7763 * Add getter and setter property condition_resulth in base task --- .../examples/task_conditions_example.py | 35 +++++----- .../src/pydolphinscheduler/core/task.py | 12 +++- .../src/pydolphinscheduler/tasks/condition.py | 21 +++++- .../tests/tasks/test_condition.py | 64 ++++++++++++------- 4 files changed, 89 insertions(+), 43 deletions(-) diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py index 8b66b7996a..6c1b039a3b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py @@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow like: -pre_task_success_1 -> - \ -pre_task_success_2 -> --> conditions -> end - / -pre_task_fail -> +pre_task_1 -> -> success_branch + \ / +pre_task_2 -> -> conditions -> + / \ +pre_task_3 -> -> fail_branch . """ @@ -35,22 +35,23 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions from pydolphinscheduler.tasks.shell import Shell with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd: - condition_pre_task_1 = Shell( - name="pre_task_success_1", command="echo pre_task_success_1" - ) - condition_pre_task_2 = Shell( - name="pre_task_success_2", command="echo pre_task_success_2" - ) - condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail") + pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1") + pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2") + pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3") cond_operator = And( And( - SUCCESS(condition_pre_task_1, condition_pre_task_2), - FAILURE(condition_pre_task_3), + SUCCESS(pre_task_1, pre_task_2), + FAILURE(pre_task_3), ), ) - end = Shell(name="end", command="echo parent") + success_branch = Shell(name="success_branch", command="echo success_branch") + fail_branch = Shell(name="fail_branch", command="echo fail_branch") - condition = Conditions(name="conditions", condition=cond_operator) - condition >> end + condition = Conditions( + name="conditions", + condition=cond_operator, + success_task=success_branch, + failed_task=fail_branch, + ) pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 8a90efcd70..693f508c23 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -156,7 +156,7 @@ class Task(Base): self.resource_list = resource_list or [] self.dependence = dependence or {} self.wait_start_timeout = wait_start_timeout or {} - self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT + self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT @property def process_definition(self) -> Optional[ProcessDefinition]: @@ -168,6 +168,16 @@ class Task(Base): """Set attribute process_definition.""" self._process_definition = process_definition + @property + def condition_result(self) -> Dict: + """Get attribute condition_result.""" + return self._condition_result + + @condition_result.setter + def condition_result(self, condition_result: Optional[Dict]): + """Set attribute condition_result.""" + self._condition_result = condition_result + @property def task_params(self) -> Optional[Dict]: """Get task parameter object. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py index 905a41baa7..895a29bfdf 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py @@ -157,9 +157,19 @@ class Or(ConditionOperator): class Conditions(Task): """Task condition object, declare behavior for condition task to dolphinscheduler.""" - def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs): + def __init__( + self, + name: str, + condition: ConditionOperator, + success_task: Task, + failed_task: Task, + *args, + **kwargs, + ): super().__init__(name, TaskType.CONDITIONS, *args, **kwargs) self.condition = condition + self.success_task = success_task + self.failed_task = failed_task # Set condition tasks as current task downstream self._set_dep() @@ -171,6 +181,15 @@ class Conditions(Task): for status in cond.args: upstream.extend(list(status.tasks)) self.set_upstream(upstream) + self.set_downstream([self.success_task, self.failed_task]) + + @property + def condition_result(self) -> Dict: + """Get condition result define for java gateway.""" + return { + "successNode": [self.success_task.code], + "failedNode": [self.failed_task.code], + } @property def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py index 9933c4f093..6085de23b0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py @@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator( "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version", return_value=(123, 1), ) -def test_dependent_get_define(mock_condition_code_version, mock_task_code_version): +def test_condition_get_define(mock_condition_code_version, mock_task_code_version): """Test task condition :func:`get_define`.""" common_task = Task(name="common_task", task_type="test_task_condition") cond_operator = And( @@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio }, ], }, - "conditionResult": {"successNode": [""], "failedNode": [""]}, + "conditionResult": { + "successNode": [common_task.code], + "failedNode": [common_task.code], + }, "waitStartTimeout": {}, }, "flag": "YES", @@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio "timeout": 0, } - task = Conditions(name, condition=cond_operator) + task = Conditions( + name, condition=cond_operator, success_task=common_task, failed_task=common_task + ) assert task.get_define() == expect @@ -396,49 +401,60 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio def test_condition_set_dep_workflow(mock_task_code_version): """Test task condition set dependence in workflow level.""" with ProcessDefinition(name="test-condition-set-dep-workflow") as pd: - condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE) - condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE) - condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE) + pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE) + pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE) + pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE) cond_operator = And( And( - SUCCESS(condition_pre_task_1, condition_pre_task_2), - FAILURE(condition_pre_task_3), + SUCCESS(pre_task_1, pre_task_2), + FAILURE(pre_task_3), ), ) - end = Task(name="end", task_type=TEST_TYPE) - condition = Conditions(name="conditions", condition=cond_operator) - condition >> end + success_branch = Task(name="success_branch", task_type=TEST_TYPE) + fail_branch = Task(name="fail_branch", task_type=TEST_TYPE) + + condition = Conditions( + name="conditions", + condition=cond_operator, + success_task=success_branch, + failed_task=fail_branch, + ) # General tasks test - assert len(pd.tasks) == 5 + assert len(pd.tasks) == 6 assert sorted(pd.task_list, key=lambda t: t.name) == sorted( [ + pre_task_1, + pre_task_2, + pre_task_3, + success_branch, + fail_branch, condition, - condition_pre_task_1, - condition_pre_task_2, - condition_pre_task_3, - end, ], key=lambda t: t.name, ) # Task dep test - assert end._upstream_task_codes == {condition.code} - assert condition._downstream_task_codes == {end.code} + assert success_branch._upstream_task_codes == {condition.code} + assert fail_branch._upstream_task_codes == {condition.code} + assert condition._downstream_task_codes == { + success_branch.code, + fail_branch.code, + } # Condition task dep after ProcessDefinition function get_define called assert condition._upstream_task_codes == { - condition_pre_task_1.code, - condition_pre_task_2.code, - condition_pre_task_3.code, + pre_task_1.code, + pre_task_2.code, + pre_task_3.code, } assert all( [ child._downstream_task_codes == {condition.code} for child in [ - condition_pre_task_1, - condition_pre_task_2, - condition_pre_task_3, + pre_task_1, + pre_task_2, + pre_task_3, ] ] )