Browse Source

[python] Fix task condition set wrong deps (#7650) (#7655)

After #7505 merged. we could use condition task type
but our dependent set in the wrong direction, all
the condition operators should be upstream of the
current task instead of downstream

fix: #7649
2.0.7-release
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
d23b3444bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
  2. 8
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
  3. 45
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

29
dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py

@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like: like:
--> condition_success_1 pre_task_success_1 ->
/ \
parent -> conditions -> --> condition_success_2 pre_task_success_2 -> --> conditions -> end
\ /
--> condition_fail pre_task_fail ->
. .
""" """
@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd: with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
parent = Shell(name="parent", command="echo parent") condition_pre_task_1 = Shell(
condition_success_1 = Shell( name="pre_task_success_1", command="echo pre_task_success_1"
name="condition_success_1", command="echo condition_success_1"
) )
condition_success_2 = Shell( condition_pre_task_2 = Shell(
name="condition_success_2", command="echo condition_success_2" name="pre_task_success_2", command="echo pre_task_success_2"
) )
condition_fail = Shell(name="condition_fail", command="echo condition_fail") condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
cond_operator = And( cond_operator = And(
And( And(
SUCCESS(condition_success_1, condition_success_2), SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_fail), FAILURE(condition_pre_task_3),
), ),
) )
end = Shell(name="end", command="echo parent")
condition = Conditions(name="conditions", condition=cond_operator) condition = Conditions(name="conditions", condition=cond_operator)
parent >> condition condition >> end
pd.submit() pd.submit()

8
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py

@ -164,13 +164,13 @@ class Conditions(Task):
self._set_dep() self._set_dep()
def _set_dep(self) -> None: def _set_dep(self) -> None:
"""Set downstream according to parameter `condition`.""" """Set upstream according to parameter `condition`."""
downstream = [] upstream = []
for cond in self.condition.args: for cond in self.condition.args:
if isinstance(cond, ConditionOperator): if isinstance(cond, ConditionOperator):
for status in cond.args: for status in cond.args:
downstream.extend(list(status.tasks)) upstream.extend(list(status.tasks))
self.set_downstream(downstream) self.set_upstream(upstream)
@property @property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:

45
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version): def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level.""" """Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd: with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
parent = Task(name="parent", task_type=TEST_TYPE) condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE) condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE) condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
cond_operator = And( cond_operator = And(
And( And(
SUCCESS(condition_success_1, condition_success_2), SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_fail), FAILURE(condition_pre_task_3),
), ),
) )
end = Task(name="end", task_type=TEST_TYPE)
condition = Conditions(name="conditions", condition=cond_operator)
condition >> end
condition = Conditions(name=TEST_NAME, condition=cond_operator)
parent >> condition
# General tasks test # General tasks test
assert len(pd.tasks) == 5 assert len(pd.tasks) == 5
assert sorted(pd.task_list, key=lambda t: t.name) == sorted( assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[ [
parent,
condition, condition,
condition_success_1, condition_pre_task_1,
condition_success_2, condition_pre_task_2,
condition_fail, condition_pre_task_3,
end,
], ],
key=lambda t: t.name, key=lambda t: t.name,
) )
# Task dep test # Task dep test
assert parent._downstream_task_codes == {condition.code} assert end._upstream_task_codes == {condition.code}
assert condition._upstream_task_codes == {parent.code} assert condition._downstream_task_codes == {end.code}
# Condition task dep after ProcessDefinition function get_define called # Condition task dep after ProcessDefinition function get_define called
assert condition._downstream_task_codes == { assert condition._upstream_task_codes == {
condition_success_1.code, condition_pre_task_1.code,
condition_success_2.code, condition_pre_task_2.code,
condition_fail.code, condition_pre_task_3.code,
} }
assert all( assert all(
[ [
child._upstream_task_codes == {condition.code} child._downstream_task_codes == {condition.code}
for child in [condition_success_1, condition_success_2, condition_fail] for child in [
condition_pre_task_1,
condition_pre_task_2,
condition_pre_task_3,
]
] ]
) )

Loading…
Cancel
Save