Browse Source

[python] Fix pre_submit_check error when local params in tasks (#10032)

3.1.0-release
陈家名 3 years ago committed by GitHub
parent
commit
a2cd2a99e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  2. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
  3. 29
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -350,14 +350,16 @@ class ProcessDefinition(Base):
This method should be called before process definition submit to java gateway
For now, we have below checker:
* `self.param` should be set if task `switch` in this workflow.
* `self.param` or at least one local param of task should be set if task `switch` in this workflow.
"""
if (
any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
and self.param is None
and all([len(task.local_params) == 0 for task in self.tasks.values()])
):
raise PyDSParamException(
"Parameter param must be provider if task Switch in process definition."
"Parameter param or at least one local_param of task must "
"be provider if task Switch in process definition."
)
def submit(self) -> int:

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py

@ -129,7 +129,11 @@ class SwitchCondition(Base):
class Switch(Task):
"""Task switch object, declare behavior for switch task to dolphinscheduler."""
"""Task switch object, declare behavior for switch task to dolphinscheduler.
Param of process definition or at least one local param of task must be set
if task `switch` in this workflow.
"""
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs)

29
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -240,11 +240,38 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
parent >> switch
with pytest.raises(
PyDSParamException,
match="Parameter param must be provider if task Switch in process definition.",
match="Parameter param or at least one local_param of task must "
"be provider if task Switch in process definition.",
):
pd._pre_submit_check()
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test__pre_submit_check_switch_with_local_params(mock_code_version):
"""Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
parent = Task(
name="parent",
task_type=TEST_TASK_TYPE,
local_params=[
{"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
],
)
switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
switch_condition = SwitchCondition(
Branch(condition="${var} > 1", task=switch_child_1),
Default(task=switch_child_2),
)
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
pd._pre_submit_check()
def test_process_definition_get_define_without_task():
"""Test process definition function get_define without task."""
expect = {

Loading…
Cancel
Save