From d23b3444bb0514f77c821be6dcdf105c9267e60e Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Tue, 28 Dec 2021 11:34:25 +0800 Subject: [PATCH] [python] Fix task condition set wrong deps (#7650) (#7655) After #7505 merged. we could use condition task type but our dependent set in the wrong direction, all the condition operators should be upstream of the current task instead of downstream fix: #7649 --- .../examples/task_conditions_example.py | 29 ++++++------ .../src/pydolphinscheduler/tasks/condition.py | 8 ++-- .../tests/tasks/test_condition.py | 45 ++++++++++--------- 3 files changed, 44 insertions(+), 38 deletions(-) diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py index a7723f3ca2..8b66b7996a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py @@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow like: - --> condition_success_1 - / -parent -> conditions -> --> condition_success_2 - \ - --> condition_fail +pre_task_success_1 -> + \ +pre_task_success_2 -> --> conditions -> end + / +pre_task_fail -> . """ @@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions from pydolphinscheduler.tasks.shell import Shell with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd: - parent = Shell(name="parent", command="echo parent") - condition_success_1 = Shell( - name="condition_success_1", command="echo condition_success_1" + condition_pre_task_1 = Shell( + name="pre_task_success_1", command="echo pre_task_success_1" ) - condition_success_2 = Shell( - name="condition_success_2", command="echo condition_success_2" + condition_pre_task_2 = Shell( + name="pre_task_success_2", command="echo pre_task_success_2" ) - condition_fail = Shell(name="condition_fail", command="echo condition_fail") + condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail") cond_operator = And( And( - SUCCESS(condition_success_1, condition_success_2), - FAILURE(condition_fail), + SUCCESS(condition_pre_task_1, condition_pre_task_2), + FAILURE(condition_pre_task_3), ), ) + end = Shell(name="end", command="echo parent") + condition = Conditions(name="conditions", condition=cond_operator) - parent >> condition + condition >> end pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py index 2278f49165..905a41baa7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py @@ -164,13 +164,13 @@ class Conditions(Task): self._set_dep() def _set_dep(self) -> None: - """Set downstream according to parameter `condition`.""" - downstream = [] + """Set upstream according to parameter `condition`.""" + upstream = [] for cond in self.condition.args: if isinstance(cond, ConditionOperator): for status in cond.args: - downstream.extend(list(status.tasks)) - self.set_downstream(downstream) + upstream.extend(list(status.tasks)) + self.set_upstream(upstream) @property def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py index 5647d5b8ab..9933c4f093 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py @@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio def test_condition_set_dep_workflow(mock_task_code_version): """Test task condition set dependence in workflow level.""" with ProcessDefinition(name="test-condition-set-dep-workflow") as pd: - parent = Task(name="parent", task_type=TEST_TYPE) - condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE) - condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE) - condition_fail = Task(name="condition_fail", task_type=TEST_TYPE) + condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE) + condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE) + condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE) cond_operator = And( And( - SUCCESS(condition_success_1, condition_success_2), - FAILURE(condition_fail), + SUCCESS(condition_pre_task_1, condition_pre_task_2), + FAILURE(condition_pre_task_3), ), ) + end = Task(name="end", task_type=TEST_TYPE) + + condition = Conditions(name="conditions", condition=cond_operator) + condition >> end - condition = Conditions(name=TEST_NAME, condition=cond_operator) - parent >> condition # General tasks test assert len(pd.tasks) == 5 assert sorted(pd.task_list, key=lambda t: t.name) == sorted( [ - parent, condition, - condition_success_1, - condition_success_2, - condition_fail, + condition_pre_task_1, + condition_pre_task_2, + condition_pre_task_3, + end, ], key=lambda t: t.name, ) # Task dep test - assert parent._downstream_task_codes == {condition.code} - assert condition._upstream_task_codes == {parent.code} + assert end._upstream_task_codes == {condition.code} + assert condition._downstream_task_codes == {end.code} # Condition task dep after ProcessDefinition function get_define called - assert condition._downstream_task_codes == { - condition_success_1.code, - condition_success_2.code, - condition_fail.code, + assert condition._upstream_task_codes == { + condition_pre_task_1.code, + condition_pre_task_2.code, + condition_pre_task_3.code, } assert all( [ - child._upstream_task_codes == {condition.code} - for child in [condition_success_1, condition_success_2, condition_fail] + child._downstream_task_codes == {condition.code} + for child in [ + condition_pre_task_1, + condition_pre_task_2, + condition_pre_task_3, + ] ] )