diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py index a7723f3ca2..8b66b7996a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py @@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow like: - --> condition_success_1 - / -parent -> conditions -> --> condition_success_2 - \ - --> condition_fail +pre_task_success_1 -> + \ +pre_task_success_2 -> --> conditions -> end + / +pre_task_fail -> . """ @@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions from pydolphinscheduler.tasks.shell import Shell with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd: - parent = Shell(name="parent", command="echo parent") - condition_success_1 = Shell( - name="condition_success_1", command="echo condition_success_1" + condition_pre_task_1 = Shell( + name="pre_task_success_1", command="echo pre_task_success_1" ) - condition_success_2 = Shell( - name="condition_success_2", command="echo condition_success_2" + condition_pre_task_2 = Shell( + name="pre_task_success_2", command="echo pre_task_success_2" ) - condition_fail = Shell(name="condition_fail", command="echo condition_fail") + condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail") cond_operator = And( And( - SUCCESS(condition_success_1, condition_success_2), - FAILURE(condition_fail), + SUCCESS(condition_pre_task_1, condition_pre_task_2), + FAILURE(condition_pre_task_3), ), ) + end = Shell(name="end", command="echo parent") + condition = Conditions(name="conditions", condition=cond_operator) - parent >> condition + condition >> end pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py index 2278f49165..905a41baa7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py @@ -164,13 +164,13 @@ class Conditions(Task): self._set_dep() def _set_dep(self) -> None: - """Set downstream according to parameter `condition`.""" - downstream = [] + """Set upstream according to parameter `condition`.""" + upstream = [] for cond in self.condition.args: if isinstance(cond, ConditionOperator): for status in cond.args: - downstream.extend(list(status.tasks)) - self.set_downstream(downstream) + upstream.extend(list(status.tasks)) + self.set_upstream(upstream) @property def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py index 5647d5b8ab..9933c4f093 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py @@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio def test_condition_set_dep_workflow(mock_task_code_version): """Test task condition set dependence in workflow level.""" with ProcessDefinition(name="test-condition-set-dep-workflow") as pd: - parent = Task(name="parent", task_type=TEST_TYPE) - condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE) - condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE) - condition_fail = Task(name="condition_fail", task_type=TEST_TYPE) + condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE) + condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE) + condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE) cond_operator = And( And( - SUCCESS(condition_success_1, condition_success_2), - FAILURE(condition_fail), + SUCCESS(condition_pre_task_1, condition_pre_task_2), + FAILURE(condition_pre_task_3), ), ) + end = Task(name="end", task_type=TEST_TYPE) + + condition = Conditions(name="conditions", condition=cond_operator) + condition >> end - condition = Conditions(name=TEST_NAME, condition=cond_operator) - parent >> condition # General tasks test assert len(pd.tasks) == 5 assert sorted(pd.task_list, key=lambda t: t.name) == sorted( [ - parent, condition, - condition_success_1, - condition_success_2, - condition_fail, + condition_pre_task_1, + condition_pre_task_2, + condition_pre_task_3, + end, ], key=lambda t: t.name, ) # Task dep test - assert parent._downstream_task_codes == {condition.code} - assert condition._upstream_task_codes == {parent.code} + assert end._upstream_task_codes == {condition.code} + assert condition._downstream_task_codes == {end.code} # Condition task dep after ProcessDefinition function get_define called - assert condition._downstream_task_codes == { - condition_success_1.code, - condition_success_2.code, - condition_fail.code, + assert condition._upstream_task_codes == { + condition_pre_task_1.code, + condition_pre_task_2.code, + condition_pre_task_3.code, } assert all( [ - child._upstream_task_codes == {condition.code} - for child in [condition_success_1, condition_success_2, condition_fail] + child._downstream_task_codes == {condition.code} + for child in [ + condition_pre_task_1, + condition_pre_task_2, + condition_pre_task_3, + ] ] )