diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py new file mode 100644 index 0000000000..418d5692e6 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +r""" +A example workflow for task switch. + +This example will create four task in single workflow, with three shell task and one switch task. Task switch +have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically +set dependence by switch task by passing parameter `condition`. The graph of this workflow like: + --> switch_child_1 + / +parent -> switch -> + \ + --> switch_child_2 +. +""" + +from tasks.switch import Branch, Default, Switch, SwitchCondition + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.shell import Shell + +with ProcessDefinition( + name="task_dependent_external", + tenant="tenant_exists", +) as pd: + parent = Shell(name="parent", command="echo parent") + switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1") + switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2") + switch_condition = SwitchCondition( + Branch(condition="${var} > 1", task=switch_child_1), + Default(task=switch_child_2), + ) + + switch = Switch(name="switch", condition=switch_condition) + parent >> switch + pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 65ab6ca36e..940c74920f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -76,6 +76,7 @@ class TaskType(str): DATAX = "DATAX" DEPENDENT = "DEPENDENT" CONDITIONS = "CONDITIONS" + SWITCH = "SWITCH" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py new file mode 100644 index 0000000000..28032f88e7 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task Switch.""" + +from typing import Dict, Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.base import Base +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.exceptions import PyDSParamException + + +class SwitchBranch(Base): + """Base class of ConditionBranch of task switch. + + It a parent class for :class:`Branch` and :class:`Default`. + """ + + _DEFINE_ATTR = { + "next_node", + } + + def __init__(self, task: Task, exp: Optional[str] = None): + super().__init__(f"Switch.{self.__class__.__name__.upper()}") + self.task = task + self.exp = exp + + @property + def next_node(self) -> str: + """Get task switch property next_node, it return task code when init class switch.""" + return self.task.code + + @property + def condition(self) -> Optional[str]: + """Get task switch property condition.""" + return self.exp + + def get_define(self, camel_attr: bool = True) -> Dict: + """Get :class:`ConditionBranch` definition attribute communicate to Java gateway server.""" + if self.condition: + self._DEFINE_ATTR.add("condition") + return super().get_define() + + +class Branch(SwitchBranch): + """Common condition branch for switch task. + + If any condition in :class:`Branch` match, would set this :class:`Branch`'s task as downstream of task + switch. If all condition branch do not match would set :class:`Default`'s task as task switch downstream. + """ + + def __init__(self, condition: str, task: Task): + super().__init__(task, condition) + + +class Default(SwitchBranch): + """Class default branch for switch task. + + If all condition of :class:`Branch` do not match, task switch would run the tasks in :class:`Default` + and set :class:`Default`'s task as switch downstream. Please notice that each switch condition + could only have one single :class:`Default`. + """ + + def __init__(self, task: Task): + super().__init__(task) + + +class SwitchCondition(Base): + """Set switch condition of given parameter.""" + + _DEFINE_ATTR = { + "depend_task_list", + } + + def __init__(self, *args): + super().__init__(self.__class__.__name__) + self.args = args + + def set_define_attr(self) -> None: + """Set attribute to function :func:`get_define`. + + It is a wrapper for both `And` and `Or` operator. + """ + result = [] + num_branch_default = 0 + for condition in self.args: + if isinstance(condition, SwitchBranch): + if num_branch_default < 1: + if isinstance(condition, Default): + self._DEFINE_ATTR.add("next_node") + setattr(self, "next_node", condition.next_node) + num_branch_default += 1 + elif isinstance(condition, Branch): + result.append(condition.get_define()) + else: + raise PyDSParamException( + "Task Switch's parameter only support exactly one default branch." + ) + else: + raise PyDSParamException( + "Task Switch's parameter only support SwitchBranch but got %s.", + type(condition), + ) + # Handle switch default branch, default value is `""` if not provide. + if num_branch_default == 0: + self._DEFINE_ATTR.add("next_node") + setattr(self, "next_node", "") + setattr(self, "depend_task_list", result) + + def get_define(self, camel_attr=True) -> Dict: + """Overwrite Base.get_define to get task Condition specific get define.""" + self.set_define_attr() + return super().get_define() + + +class Switch(Task): + """Task switch object, declare behavior for switch task to dolphinscheduler.""" + + def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs): + super().__init__(name, TaskType.SWITCH, *args, **kwargs) + self.condition = condition + # Set condition tasks as current task downstream + self._set_dep() + + def _set_dep(self) -> None: + """Set downstream according to parameter `condition`.""" + downstream = [] + for condition in self.condition.args: + if isinstance(condition, SwitchBranch): + downstream.append(condition.task) + self.set_downstream(downstream) + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for switch task. + + switch task have some specials attribute `switch`, and in most of the task + this attribute is None and use empty dict `{}` as default value. We do not use class + attribute `_task_custom_attr` due to avoid attribute cover. + """ + params = super().task_params + params["switchResult"] = self.condition.get_define() + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py new file mode 100644 index 0000000000..1f6ff5bfa2 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py @@ -0,0 +1,300 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task switch.""" + +from typing import Optional, Tuple +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.tasks.switch import ( + Branch, + Default, + Switch, + SwitchBranch, + SwitchCondition, +) +from tests.testing.task import Task + +TEST_NAME = "test-task" +TEST_TYPE = "test-type" + + +def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) -> SwitchBranch: + """Wrap task switch and its subclass.""" + if obj is Default: + return obj(task) + elif obj is Branch: + return obj(exp, task) + else: + return obj(task, exp) + + +@pytest.mark.parametrize( + "obj", + [ + SwitchBranch, + Branch, + Default, + ], +) +def test_switch_branch_attr_next_node(obj: SwitchBranch): + """Test get attribute from class switch branch.""" + task = Task(name=TEST_NAME, task_type=TEST_TYPE) + switch_branch = task_switch_arg_wrapper(obj, task=task, exp="unittest") + assert switch_branch.next_node == task.code + + +@pytest.mark.parametrize( + "obj", + [ + SwitchBranch, + Default, + ], +) +def test_switch_branch_get_define_without_condition(obj: SwitchBranch): + """Test function :func:`get_define` with None value of attribute condition from class switch branch.""" + task = Task(name=TEST_NAME, task_type=TEST_TYPE) + expect = {"nextNode": task.code} + switch_branch = task_switch_arg_wrapper(obj, task=task) + assert switch_branch.get_define() == expect + + +@pytest.mark.parametrize( + "obj", + [ + SwitchBranch, + Branch, + ], +) +def test_switch_branch_get_define_condition(obj: SwitchBranch): + """Test function :func:`get_define` with specific attribute condition from class switch branch.""" + task = Task(name=TEST_NAME, task_type=TEST_TYPE) + exp = "${var} == 1" + expect = { + "nextNode": task.code, + "condition": exp, + } + switch_branch = task_switch_arg_wrapper(obj, task=task, exp=exp) + assert switch_branch.get_define() == expect + + +@pytest.mark.parametrize( + "args, msg", + [ + ( + (1,), + ".*?parameter only support SwitchBranch but got.*?", + ), + ( + (Default(Task(TEST_NAME, TEST_TYPE)), 2), + ".*?parameter only support SwitchBranch but got.*?", + ), + ( + (Default(Task(TEST_NAME, TEST_TYPE)), Default(Task(TEST_NAME, TEST_TYPE))), + ".*?parameter only support exactly one default branch", + ), + ( + ( + Branch(condition="unittest", task=Task(TEST_NAME, TEST_TYPE)), + Default(Task(TEST_NAME, TEST_TYPE)), + Default(Task(TEST_NAME, TEST_TYPE)), + ), + ".*?parameter only support exactly one default branch", + ), + ], +) +def test_switch_condition_set_define_attr_error(args: Tuple, msg: str): + """Test error case on :class:`SwitchCondition`.""" + switch_condition = SwitchCondition(*args) + with pytest.raises(PyDSParamException, match=msg): + switch_condition.set_define_attr() + + +def test_switch_condition_set_define_attr_default(): + """Test set :class:`Default` to attribute on :class:`SwitchCondition`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition(Default(task)) + switch_condition.set_define_attr() + assert getattr(switch_condition, "next_node") == task.code + assert getattr(switch_condition, "depend_task_list") == [] + + +def test_switch_condition_set_define_attr_branch(): + """Test set :class:`Branch` to attribute on :class:`SwitchCondition`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition( + Branch("unittest1", task), Branch("unittest2", task) + ) + expect = [ + {"condition": "unittest1", "nextNode": task.code}, + {"condition": "unittest2", "nextNode": task.code}, + ] + + switch_condition.set_define_attr() + assert getattr(switch_condition, "next_node") == "" + assert getattr(switch_condition, "depend_task_list") == expect + + +def test_switch_condition_set_define_attr_mix_branch_and_default(): + """Test set bot :class:`Branch` and :class:`Default` to attribute on :class:`SwitchCondition`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition( + Branch("unittest1", task), Branch("unittest2", task), Default(task) + ) + expect = [ + {"condition": "unittest1", "nextNode": task.code}, + {"condition": "unittest2", "nextNode": task.code}, + ] + + switch_condition.set_define_attr() + assert getattr(switch_condition, "next_node") == task.code + assert getattr(switch_condition, "depend_task_list") == expect + + +def test_switch_condition_get_define_default(): + """Test function :func:`get_define` with :class:`Default` in :class:`SwitchCondition`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition(Default(task)) + expect = { + "dependTaskList": [], + "nextNode": task.code, + } + assert switch_condition.get_define() == expect + + +def test_switch_condition_get_define_branch(): + """Test function :func:`get_define` with :class:`Branch` in :class:`SwitchCondition`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition( + Branch("unittest1", task), Branch("unittest2", task) + ) + expect = { + "dependTaskList": [ + {"condition": "unittest1", "nextNode": task.code}, + {"condition": "unittest2", "nextNode": task.code}, + ], + "nextNode": "", + } + assert switch_condition.get_define() == expect + + +def test_switch_condition_get_define_mix_branch_and_default(): + """Test function :func:`get_define` with both :class:`Branch` and :class:`Default`.""" + task = Task(TEST_NAME, TEST_TYPE) + switch_condition = SwitchCondition( + Branch("unittest1", task), Branch("unittest2", task), Default(task) + ) + expect = { + "dependTaskList": [ + {"condition": "unittest1", "nextNode": task.code}, + {"condition": "unittest2", "nextNode": task.code}, + ], + "nextNode": task.code, + } + assert switch_condition.get_define() == expect + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_switch_get_define(mock_task_code_version): + """Test task switch :func:`get_define`.""" + task = Task(name=TEST_NAME, task_type=TEST_TYPE) + switch_condition = SwitchCondition( + Branch(condition="${var1} > 1", task=task), + Branch(condition="${var1} <= 1", task=task), + Default(task), + ) + + name = "test_switch_get_define" + expect = { + "code": 123, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "SWITCH", + "taskParams": { + "resourceList": [], + "localParams": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + "switchResult": { + "dependTaskList": [ + {"condition": "${var1} > 1", "nextNode": task.code}, + {"condition": "${var1} <= 1", "nextNode": task.code}, + ], + "nextNode": task.code, + }, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + + task = Switch(name, condition=switch_condition) + assert task.get_define() == expect + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_switch_set_dep_workflow(mock_task_code_version): + """Test task switch set dependence in workflow level.""" + with ProcessDefinition(name="test-switch-set-dep-workflow") as pd: + parent = Task(name="parent", task_type=TEST_TYPE) + switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE) + switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE) + switch_condition = SwitchCondition( + Branch(condition="${var} > 1", task=switch_child_1), + Default(task=switch_child_2), + ) + + switch = Switch(name=TEST_NAME, condition=switch_condition) + parent >> switch + # General tasks test + assert len(pd.tasks) == 4 + assert sorted(pd.task_list, key=lambda t: t.name) == sorted( + [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name + ) + # Task dep test + assert parent._downstream_task_codes == {switch.code} + assert switch._upstream_task_codes == {parent.code} + + # Switch task dep after ProcessDefinition function get_define called + assert switch._downstream_task_codes == { + switch_child_1.code, + switch_child_2.code, + } + assert all( + [ + child._upstream_task_codes == {switch.code} + for child in [switch_child_1, switch_child_2] + ] + )