You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
4.8 KiB
169 lines
4.8 KiB
# Licensed to the Apache Software Foundation (ASF) under one |
|
# or more contributor license agreements. See the NOTICE file |
|
# distributed with this work for additional information |
|
# regarding copyright ownership. The ASF licenses this file |
|
# to you under the Apache License, Version 2.0 (the |
|
# "License"); you may not use this file except in compliance |
|
# with the License. You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, |
|
# software distributed under the License is distributed on an |
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
# KIND, either express or implied. See the License for the |
|
# specific language governing permissions and limitations |
|
# under the License. |
|
|
|
"""Test module about function wrap task decorator.""" |
|
|
|
from unittest.mock import patch |
|
|
|
import pytest |
|
|
|
from pydolphinscheduler.core.process_definition import ProcessDefinition |
|
from pydolphinscheduler.exceptions import PyDSParamException |
|
from pydolphinscheduler.tasks.func_wrap import task |
|
from tests.testing.decorator import foo as foo_decorator |
|
from tests.testing.task import Task |
|
|
|
PD_NAME = "test_process_definition" |
|
TASK_NAME = "test_task" |
|
|
|
|
|
@patch( |
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) |
|
) |
|
def test_single_task_outside(mock_code): |
|
"""Test single decorator task which outside process definition.""" |
|
|
|
@task |
|
def foo(): |
|
print(TASK_NAME) |
|
|
|
with ProcessDefinition(PD_NAME) as pd: |
|
foo() |
|
|
|
assert pd is not None and pd.name == PD_NAME |
|
assert len(pd.tasks) == 1 |
|
|
|
pd_task = pd.tasks[12345] |
|
assert pd_task.name == "foo" |
|
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" |
|
|
|
|
|
@patch( |
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) |
|
) |
|
def test_single_task_inside(mock_code): |
|
"""Test single decorator task which inside process definition.""" |
|
with ProcessDefinition(PD_NAME) as pd: |
|
|
|
@task |
|
def foo(): |
|
print(TASK_NAME) |
|
|
|
foo() |
|
|
|
assert pd is not None and pd.name == PD_NAME |
|
assert len(pd.tasks) == 1 |
|
|
|
pd_task = pd.tasks[12345] |
|
assert pd_task.name == "foo" |
|
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()" |
|
|
|
|
|
@patch( |
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1) |
|
) |
|
def test_addition_decorator_error(mock_code): |
|
"""Test error when using task decorator to a function already have decorator.""" |
|
|
|
@task |
|
@foo_decorator |
|
def foo(): |
|
print(TASK_NAME) |
|
|
|
with ProcessDefinition(PD_NAME) as pd: # noqa: F841 |
|
with pytest.raises( |
|
PyDSParamException, match="Do no support other decorators for.*" |
|
): |
|
foo() |
|
|
|
|
|
@patch( |
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
|
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, |
|
) |
|
def test_multiple_tasks_outside(mock_code): |
|
"""Test multiple decorator tasks which outside process definition.""" |
|
|
|
@task |
|
def foo(): |
|
print(TASK_NAME) |
|
|
|
@task |
|
def bar(): |
|
print(TASK_NAME) |
|
|
|
with ProcessDefinition(PD_NAME) as pd: |
|
foo = foo() |
|
bar = bar() |
|
|
|
foo >> bar |
|
|
|
assert pd is not None and pd.name == PD_NAME |
|
assert len(pd.tasks) == 2 |
|
|
|
task_foo = pd.get_one_task_by_name("foo") |
|
task_bar = pd.get_one_task_by_name("bar") |
|
assert set(pd.task_list) == {task_foo, task_bar} |
|
assert ( |
|
task_foo is not None |
|
and task_foo._upstream_task_codes == set() |
|
and task_foo._downstream_task_codes.pop() == task_bar.code |
|
) |
|
assert ( |
|
task_bar is not None |
|
and task_bar._upstream_task_codes.pop() == task_foo.code |
|
and task_bar._downstream_task_codes == set() |
|
) |
|
|
|
|
|
@patch( |
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
|
side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version, |
|
) |
|
def test_multiple_tasks_inside(mock_code): |
|
"""Test multiple decorator tasks which inside process definition.""" |
|
with ProcessDefinition(PD_NAME) as pd: |
|
|
|
@task |
|
def foo(): |
|
print(TASK_NAME) |
|
|
|
@task |
|
def bar(): |
|
print(TASK_NAME) |
|
|
|
foo = foo() |
|
bar = bar() |
|
|
|
foo >> bar |
|
|
|
assert pd is not None and pd.name == PD_NAME |
|
assert len(pd.tasks) == 2 |
|
|
|
task_foo = pd.get_one_task_by_name("foo") |
|
task_bar = pd.get_one_task_by_name("bar") |
|
assert set(pd.task_list) == {task_foo, task_bar} |
|
assert ( |
|
task_foo is not None |
|
and task_foo._upstream_task_codes == set() |
|
and task_foo._downstream_task_codes.pop() == task_bar.code |
|
) |
|
assert ( |
|
task_bar is not None |
|
and task_bar._upstream_task_codes.pop() == task_foo.code |
|
and task_bar._downstream_task_codes == set() |
|
)
|
|
|