Browse Source

[python] Add task switch (#7531)

* [python] Add task switch

close: #6928

* Fix code style
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
946a0c7c57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
  2. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  3. 158
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
  4. 300
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

51
dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py

@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
r"""
A example workflow for task switch.
This example will create four task in single workflow, with three shell task and one switch task. Task switch
have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
--> switch_child_1
/
parent -> switch ->
\
--> switch_child_2
.
"""
from tasks.switch import Branch, Default, Switch, SwitchCondition
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(
name="task_dependent_external",
tenant="tenant_exists",
) as pd:
parent = Shell(name="parent", command="echo parent")
switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
switch_condition = SwitchCondition(
Branch(condition="${var} > 1", task=switch_child_1),
Default(task=switch_child_2),
)
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
pd.submit()

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -76,6 +76,7 @@ class TaskType(str):
DATAX = "DATAX"
DEPENDENT = "DEPENDENT"
CONDITIONS = "CONDITIONS"
SWITCH = "SWITCH"
class DefaultTaskCodeNum(str):

158
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py

@ -0,0 +1,158 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task Switch."""
from typing import Dict, Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
class SwitchBranch(Base):
"""Base class of ConditionBranch of task switch.
It a parent class for :class:`Branch` and :class:`Default`.
"""
_DEFINE_ATTR = {
"next_node",
}
def __init__(self, task: Task, exp: Optional[str] = None):
super().__init__(f"Switch.{self.__class__.__name__.upper()}")
self.task = task
self.exp = exp
@property
def next_node(self) -> str:
"""Get task switch property next_node, it return task code when init class switch."""
return self.task.code
@property
def condition(self) -> Optional[str]:
"""Get task switch property condition."""
return self.exp
def get_define(self, camel_attr: bool = True) -> Dict:
"""Get :class:`ConditionBranch` definition attribute communicate to Java gateway server."""
if self.condition:
self._DEFINE_ATTR.add("condition")
return super().get_define()
class Branch(SwitchBranch):
"""Common condition branch for switch task.
If any condition in :class:`Branch` match, would set this :class:`Branch`'s task as downstream of task
switch. If all condition branch do not match would set :class:`Default`'s task as task switch downstream.
"""
def __init__(self, condition: str, task: Task):
super().__init__(task, condition)
class Default(SwitchBranch):
"""Class default branch for switch task.
If all condition of :class:`Branch` do not match, task switch would run the tasks in :class:`Default`
and set :class:`Default`'s task as switch downstream. Please notice that each switch condition
could only have one single :class:`Default`.
"""
def __init__(self, task: Task):
super().__init__(task)
class SwitchCondition(Base):
"""Set switch condition of given parameter."""
_DEFINE_ATTR = {
"depend_task_list",
}
def __init__(self, *args):
super().__init__(self.__class__.__name__)
self.args = args
def set_define_attr(self) -> None:
"""Set attribute to function :func:`get_define`.
It is a wrapper for both `And` and `Or` operator.
"""
result = []
num_branch_default = 0
for condition in self.args:
if isinstance(condition, SwitchBranch):
if num_branch_default < 1:
if isinstance(condition, Default):
self._DEFINE_ATTR.add("next_node")
setattr(self, "next_node", condition.next_node)
num_branch_default += 1
elif isinstance(condition, Branch):
result.append(condition.get_define())
else:
raise PyDSParamException(
"Task Switch's parameter only support exactly one default branch."
)
else:
raise PyDSParamException(
"Task Switch's parameter only support SwitchBranch but got %s.",
type(condition),
)
# Handle switch default branch, default value is `""` if not provide.
if num_branch_default == 0:
self._DEFINE_ATTR.add("next_node")
setattr(self, "next_node", "")
setattr(self, "depend_task_list", result)
def get_define(self, camel_attr=True) -> Dict:
"""Overwrite Base.get_define to get task Condition specific get define."""
self.set_define_attr()
return super().get_define()
class Switch(Task):
"""Task switch object, declare behavior for switch task to dolphinscheduler."""
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs)
self.condition = condition
# Set condition tasks as current task downstream
self._set_dep()
def _set_dep(self) -> None:
"""Set downstream according to parameter `condition`."""
downstream = []
for condition in self.condition.args:
if isinstance(condition, SwitchBranch):
downstream.append(condition.task)
self.set_downstream(downstream)
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for switch task.
switch task have some specials attribute `switch`, and in most of the task
this attribute is None and use empty dict `{}` as default value. We do not use class
attribute `_task_custom_attr` due to avoid attribute cover.
"""
params = super().task_params
params["switchResult"] = self.condition.get_define()
return params

300
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

@ -0,0 +1,300 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task switch."""
from typing import Optional, Tuple
from unittest.mock import patch
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.switch import (
Branch,
Default,
Switch,
SwitchBranch,
SwitchCondition,
)
from tests.testing.task import Task
TEST_NAME = "test-task"
TEST_TYPE = "test-type"
def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) -> SwitchBranch:
"""Wrap task switch and its subclass."""
if obj is Default:
return obj(task)
elif obj is Branch:
return obj(exp, task)
else:
return obj(task, exp)
@pytest.mark.parametrize(
"obj",
[
SwitchBranch,
Branch,
Default,
],
)
def test_switch_branch_attr_next_node(obj: SwitchBranch):
"""Test get attribute from class switch branch."""
task = Task(name=TEST_NAME, task_type=TEST_TYPE)
switch_branch = task_switch_arg_wrapper(obj, task=task, exp="unittest")
assert switch_branch.next_node == task.code
@pytest.mark.parametrize(
"obj",
[
SwitchBranch,
Default,
],
)
def test_switch_branch_get_define_without_condition(obj: SwitchBranch):
"""Test function :func:`get_define` with None value of attribute condition from class switch branch."""
task = Task(name=TEST_NAME, task_type=TEST_TYPE)
expect = {"nextNode": task.code}
switch_branch = task_switch_arg_wrapper(obj, task=task)
assert switch_branch.get_define() == expect
@pytest.mark.parametrize(
"obj",
[
SwitchBranch,
Branch,
],
)
def test_switch_branch_get_define_condition(obj: SwitchBranch):
"""Test function :func:`get_define` with specific attribute condition from class switch branch."""
task = Task(name=TEST_NAME, task_type=TEST_TYPE)
exp = "${var} == 1"
expect = {
"nextNode": task.code,
"condition": exp,
}
switch_branch = task_switch_arg_wrapper(obj, task=task, exp=exp)
assert switch_branch.get_define() == expect
@pytest.mark.parametrize(
"args, msg",
[
(
(1,),
".*?parameter only support SwitchBranch but got.*?",
),
(
(Default(Task(TEST_NAME, TEST_TYPE)), 2),
".*?parameter only support SwitchBranch but got.*?",
),
(
(Default(Task(TEST_NAME, TEST_TYPE)), Default(Task(TEST_NAME, TEST_TYPE))),
".*?parameter only support exactly one default branch",
),
(
(
Branch(condition="unittest", task=Task(TEST_NAME, TEST_TYPE)),
Default(Task(TEST_NAME, TEST_TYPE)),
Default(Task(TEST_NAME, TEST_TYPE)),
),
".*?parameter only support exactly one default branch",
),
],
)
def test_switch_condition_set_define_attr_error(args: Tuple, msg: str):
"""Test error case on :class:`SwitchCondition`."""
switch_condition = SwitchCondition(*args)
with pytest.raises(PyDSParamException, match=msg):
switch_condition.set_define_attr()
def test_switch_condition_set_define_attr_default():
"""Test set :class:`Default` to attribute on :class:`SwitchCondition`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(Default(task))
switch_condition.set_define_attr()
assert getattr(switch_condition, "next_node") == task.code
assert getattr(switch_condition, "depend_task_list") == []
def test_switch_condition_set_define_attr_branch():
"""Test set :class:`Branch` to attribute on :class:`SwitchCondition`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(
Branch("unittest1", task), Branch("unittest2", task)
)
expect = [
{"condition": "unittest1", "nextNode": task.code},
{"condition": "unittest2", "nextNode": task.code},
]
switch_condition.set_define_attr()
assert getattr(switch_condition, "next_node") == ""
assert getattr(switch_condition, "depend_task_list") == expect
def test_switch_condition_set_define_attr_mix_branch_and_default():
"""Test set bot :class:`Branch` and :class:`Default` to attribute on :class:`SwitchCondition`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(
Branch("unittest1", task), Branch("unittest2", task), Default(task)
)
expect = [
{"condition": "unittest1", "nextNode": task.code},
{"condition": "unittest2", "nextNode": task.code},
]
switch_condition.set_define_attr()
assert getattr(switch_condition, "next_node") == task.code
assert getattr(switch_condition, "depend_task_list") == expect
def test_switch_condition_get_define_default():
"""Test function :func:`get_define` with :class:`Default` in :class:`SwitchCondition`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(Default(task))
expect = {
"dependTaskList": [],
"nextNode": task.code,
}
assert switch_condition.get_define() == expect
def test_switch_condition_get_define_branch():
"""Test function :func:`get_define` with :class:`Branch` in :class:`SwitchCondition`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(
Branch("unittest1", task), Branch("unittest2", task)
)
expect = {
"dependTaskList": [
{"condition": "unittest1", "nextNode": task.code},
{"condition": "unittest2", "nextNode": task.code},
],
"nextNode": "",
}
assert switch_condition.get_define() == expect
def test_switch_condition_get_define_mix_branch_and_default():
"""Test function :func:`get_define` with both :class:`Branch` and :class:`Default`."""
task = Task(TEST_NAME, TEST_TYPE)
switch_condition = SwitchCondition(
Branch("unittest1", task), Branch("unittest2", task), Default(task)
)
expect = {
"dependTaskList": [
{"condition": "unittest1", "nextNode": task.code},
{"condition": "unittest2", "nextNode": task.code},
],
"nextNode": task.code,
}
assert switch_condition.get_define() == expect
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_switch_get_define(mock_task_code_version):
"""Test task switch :func:`get_define`."""
task = Task(name=TEST_NAME, task_type=TEST_TYPE)
switch_condition = SwitchCondition(
Branch(condition="${var1} > 1", task=task),
Branch(condition="${var1} <= 1", task=task),
Default(task),
)
name = "test_switch_get_define"
expect = {
"code": 123,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "SWITCH",
"taskParams": {
"resourceList": [],
"localParams": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
"switchResult": {
"dependTaskList": [
{"condition": "${var1} > 1", "nextNode": task.code},
{"condition": "${var1} <= 1", "nextNode": task.code},
],
"nextNode": task.code,
},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
task = Switch(name, condition=switch_condition)
assert task.get_define() == expect
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_switch_set_dep_workflow(mock_task_code_version):
"""Test task switch set dependence in workflow level."""
with ProcessDefinition(name="test-switch-set-dep-workflow") as pd:
parent = Task(name="parent", task_type=TEST_TYPE)
switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
switch_condition = SwitchCondition(
Branch(condition="${var} > 1", task=switch_child_1),
Default(task=switch_child_2),
)
switch = Switch(name=TEST_NAME, condition=switch_condition)
parent >> switch
# General tasks test
assert len(pd.tasks) == 4
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name
)
# Task dep test
assert parent._downstream_task_codes == {switch.code}
assert switch._upstream_task_codes == {parent.code}
# Switch task dep after ProcessDefinition function get_define called
assert switch._downstream_task_codes == {
switch_child_1.code,
switch_child_2.code,
}
assert all(
[
child._upstream_task_codes == {switch.code}
for child in [switch_child_1, switch_child_2]
]
)
Loading…
Cancel
Save