Browse Source

[python] Fix task condition set wrong deps (#7650)

After #7505 merged. we could use condition task type
but our dependent set in the wrong direction, all
the condition operators should be upstream of the
current task instead of downstream

fix: #7649
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
cc8fbe3e14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
  2. 8
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
  3. 45
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

29
dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py

@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like:
--> condition_success_1
/
parent -> conditions -> --> condition_success_2
\
--> condition_fail
pre_task_success_1 ->
\
pre_task_success_2 -> --> conditions -> end
/
pre_task_fail ->
.
"""
@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
parent = Shell(name="parent", command="echo parent")
condition_success_1 = Shell(
name="condition_success_1", command="echo condition_success_1"
condition_pre_task_1 = Shell(
name="pre_task_success_1", command="echo pre_task_success_1"
)
condition_success_2 = Shell(
name="condition_success_2", command="echo condition_success_2"
condition_pre_task_2 = Shell(
name="pre_task_success_2", command="echo pre_task_success_2"
)
condition_fail = Shell(name="condition_fail", command="echo condition_fail")
condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
cond_operator = And(
And(
SUCCESS(condition_success_1, condition_success_2),
FAILURE(condition_fail),
SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_pre_task_3),
),
)
end = Shell(name="end", command="echo parent")
condition = Conditions(name="conditions", condition=cond_operator)
parent >> condition
condition >> end
pd.submit()

8
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py

@ -164,13 +164,13 @@ class Conditions(Task):
self._set_dep()
def _set_dep(self) -> None:
"""Set downstream according to parameter `condition`."""
downstream = []
"""Set upstream according to parameter `condition`."""
upstream = []
for cond in self.condition.args:
if isinstance(cond, ConditionOperator):
for status in cond.args:
downstream.extend(list(status.tasks))
self.set_downstream(downstream)
upstream.extend(list(status.tasks))
self.set_upstream(upstream)
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:

45
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
parent = Task(name="parent", task_type=TEST_TYPE)
condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
cond_operator = And(
And(
SUCCESS(condition_success_1, condition_success_2),
FAILURE(condition_fail),
SUCCESS(condition_pre_task_1, condition_pre_task_2),
FAILURE(condition_pre_task_3),
),
)
end = Task(name="end", task_type=TEST_TYPE)
condition = Conditions(name="conditions", condition=cond_operator)
condition >> end
condition = Conditions(name=TEST_NAME, condition=cond_operator)
parent >> condition
# General tasks test
assert len(pd.tasks) == 5
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[
parent,
condition,
condition_success_1,
condition_success_2,
condition_fail,
condition_pre_task_1,
condition_pre_task_2,
condition_pre_task_3,
end,
],
key=lambda t: t.name,
)
# Task dep test
assert parent._downstream_task_codes == {condition.code}
assert condition._upstream_task_codes == {parent.code}
assert end._upstream_task_codes == {condition.code}
assert condition._downstream_task_codes == {end.code}
# Condition task dep after ProcessDefinition function get_define called
assert condition._downstream_task_codes == {
condition_success_1.code,
condition_success_2.code,
condition_fail.code,
assert condition._upstream_task_codes == {
condition_pre_task_1.code,
condition_pre_task_2.code,
condition_pre_task_3.code,
}
assert all(
[
child._upstream_task_codes == {condition.code}
for child in [condition_success_1, condition_success_2, condition_fail]
child._downstream_task_codes == {condition.code}
for child in [
condition_pre_task_1,
condition_pre_task_2,
condition_pre_task_3,
]
]
)

Loading…
Cancel
Save