From a2cd2a99e65c3a85bdebfc37e1465e98b2d4431a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=AE=B6=E5=90=8D?= <13774486042@163.com> Date: Mon, 16 May 2022 19:51:41 +0800 Subject: [PATCH] [python] Fix pre_submit_check error when local params in tasks (#10032) --- .../core/process_definition.py | 6 ++-- .../src/pydolphinscheduler/tasks/switch.py | 6 +++- .../tests/core/test_process_definition.py | 29 ++++++++++++++++++- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 649e2ce51a..dbf2c41795 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -350,14 +350,16 @@ class ProcessDefinition(Base): This method should be called before process definition submit to java gateway For now, we have below checker: - * `self.param` should be set if task `switch` in this workflow. + * `self.param` or at least one local param of task should be set if task `switch` in this workflow. """ if ( any([task.task_type == TaskType.SWITCH for task in self.tasks.values()]) and self.param is None + and all([len(task.local_params) == 0 for task in self.tasks.values()]) ): raise PyDSParamException( - "Parameter param must be provider if task Switch in process definition." + "Parameter param or at least one local_param of task must " + "be provider if task Switch in process definition." ) def submit(self) -> int: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py index 28032f88e7..0c9a2b82b3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py @@ -129,7 +129,11 @@ class SwitchCondition(Base): class Switch(Task): - """Task switch object, declare behavior for switch task to dolphinscheduler.""" + """Task switch object, declare behavior for switch task to dolphinscheduler. + + Param of process definition or at least one local param of task must be set + if task `switch` in this workflow. + """ def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs): super().__init__(name, TaskType.SWITCH, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 88028f72c7..36e1cb035e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -240,11 +240,38 @@ def test__pre_submit_check_switch_without_param(mock_code_version): parent >> switch with pytest.raises( PyDSParamException, - match="Parameter param must be provider if task Switch in process definition.", + match="Parameter param or at least one local_param of task must " + "be provider if task Switch in process definition.", ): pd._pre_submit_check() +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test__pre_submit_check_switch_with_local_params(mock_code_version): + """Test :func:`_pre_submit_check` if process definition with switch with local params of task.""" + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + parent = Task( + name="parent", + task_type=TEST_TASK_TYPE, + local_params=[ + {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""} + ], + ) + switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE) + switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE) + switch_condition = SwitchCondition( + Branch(condition="${var} > 1", task=switch_child_1), + Default(task=switch_child_2), + ) + + switch = Switch(name="switch", condition=switch_condition) + parent >> switch + pd._pre_submit_check() + + def test_process_definition_get_define_without_task(): """Test process definition function get_define without task.""" expect = {