分布式调度框架。
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

419 lines
14 KiB

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test process definition."""
from datetime import datetime
from typing import Any
from unittest.mock import patch
import pytest
from freezegun import freeze_time
from pydolphinscheduler.constants import (
ProcessDefinitionDefault,
ProcessDefinitionReleaseState,
)
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
from pydolphinscheduler.utils.date import conv_to_schedule
from tests.testing.task import Task
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
TEST_TASK_TYPE = "test-task-type"
@pytest.mark.parametrize("func", ["run", "submit", "start"])
def test_process_definition_key_attr(func):
"""Test process definition have specific functions or attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert hasattr(
pd, func
), f"ProcessDefinition instance don't have attribute `{func}`"
@pytest.mark.parametrize(
"name,value",
[
("timezone", ProcessDefinitionDefault.TIME_ZONE),
("project", Project(ProcessDefinitionDefault.PROJECT)),
("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
(
"user",
User(
ProcessDefinitionDefault.USER,
ProcessDefinitionDefault.USER_PWD,
ProcessDefinitionDefault.USER_EMAIL,
ProcessDefinitionDefault.USER_PHONE,
ProcessDefinitionDefault.TENANT,
ProcessDefinitionDefault.QUEUE,
ProcessDefinitionDefault.USER_STATE,
),
),
("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
def test_process_definition_default_value(name, value):
"""Test process definition default attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert getattr(pd, name) == value, (
f"ProcessDefinition instance attribute `{name}` not with "
f"except default value `{getattr(pd, name)}`"
)
@pytest.mark.parametrize(
"name,cls,expect",
[
("name", str, "name"),
("description", str, "description"),
("schedule", str, "schedule"),
("timezone", str, "timezone"),
("worker_group", str, "worker_group"),
("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
],
)
def test_set_attr(name, cls, expect):
"""Test process definition set attributes which get with same type."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, name, expect)
assert (
getattr(pd, name) == expect
), f"ProcessDefinition set attribute `{name}` do not work expect"
@pytest.mark.parametrize(
"set_attr,set_val,get_attr,get_val",
[
("_project", "project", "project", Project("project")),
("_tenant", "tenant", "tenant", Tenant("tenant")),
("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
],
)
def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
"""Test process definition set attributes which get with different type."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, set_attr, set_val)
assert get_val == getattr(
pd, get_attr
), f"Set attribute {set_attr} can not get back with {get_val}."
@pytest.mark.parametrize(
"val,expect",
[
(datetime(2021, 1, 1), datetime(2021, 1, 1)),
(None, None),
("2021-01-01", datetime(2021, 1, 1)),
("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
],
)
def test__parse_datetime(val, expect):
"""Test process definition function _parse_datetime.
Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
"""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert expect == pd._parse_datetime(
val
), f"Function _parse_datetime with unexpect value by {val}."
@pytest.mark.parametrize(
"val",
[
20210101,
(2021, 1, 1),
{"year": "2021", "month": "1", "day": 1},
],
)
def test__parse_datetime_not_support_type(val: Any):
"""Test process definition function _parse_datetime not support type error."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
pd._parse_datetime(val)
@pytest.mark.parametrize(
"param, expect",
[
(
None,
[],
),
(
{},
[],
),
(
{"key1": "val1"},
[
{
"prop": "key1",
"direct": "IN",
"type": "VARCHAR",
"value": "val1",
}
],
),
(
{
"key1": "val1",
"key2": "val2",
},
[
{
"prop": "key1",
"direct": "IN",
"type": "VARCHAR",
"value": "val1",
},
{
"prop": "key2",
"direct": "IN",
"type": "VARCHAR",
"value": "val2",
},
],
),
],
)
def test_property_param_json(param, expect):
"""Test ProcessDefinition's property param_json."""
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
assert pd.param_json == expect
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test__pre_submit_check_switch_without_param(mock_code_version):
"""Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
parent = Task(name="parent", task_type=TEST_TASK_TYPE)
switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
switch_condition = SwitchCondition(
Branch(condition="${var} > 1", task=switch_child_1),
Default(task=switch_child_2),
)
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
with pytest.raises(
PyDSParamException,
match="Parameter param must be provider if task Switch in process definition.",
):
pd._pre_submit_check()
def test_process_definition_get_define_without_task():
"""Test process definition function get_define without task."""
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
"project": ProcessDefinitionDefault.PROJECT,
"tenant": ProcessDefinitionDefault.TENANT,
"workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None,
"tasks": {},
"taskDefinitionJson": [{}],
"taskRelationJson": [{}],
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.get_define() == expect
def test_process_definition_simple_context_manager():
"""Test simple create workflow in process definition context manager mode."""
expect_tasks_num = 5
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
for i in range(expect_tasks_num):
curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
# Set deps task i as i-1 parent
if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
assert len(pd.tasks) == expect_tasks_num
# Test if task process_definition same as origin one
task: Task = pd.get_one_task_by_name("task-0")
assert pd is task.process_definition
# Test if all tasks with expect deps
for i in range(expect_tasks_num):
task: Task = pd.get_one_task_by_name(f"task-{i}")
if i == 0:
assert task._upstream_task_codes == set()
assert task._downstream_task_codes == {
pd.get_one_task_by_name("task-1").code
}
elif i == expect_tasks_num - 1:
assert task._upstream_task_codes == {
pd.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == set()
else:
assert task._upstream_task_codes == {
pd.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == {
pd.get_one_task_by_name(f"task-{i + 1}").code
}
def test_process_definition_simple_separate():
"""Test process definition simple create workflow in separate mode.
This test just test basic information, cause most of test case is duplicate to
test_process_definition_simple_context_manager.
"""
expect_tasks_num = 5
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
for i in range(expect_tasks_num):
curr_task = Task(
name=f"task-{i}",
task_type=f"type-{i}",
process_definition=pd,
)
# Set deps task i as i-1 parent
if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
assert len(pd.tasks) == expect_tasks_num
assert all(["task-" in task.name for task in pd.task_list])
@pytest.mark.parametrize(
"user_attrs",
[
{"tenant": "tenant_specific"},
{"queue": "queue_specific"},
{"tenant": "tenant_specific", "queue": "queue_specific"},
],
)
def test_set_process_definition_user_attr(user_attrs):
"""Test user with correct attributes if we specific assigned to process definition object."""
default_value = {
"tenant": ProcessDefinitionDefault.TENANT,
"queue": ProcessDefinitionDefault.QUEUE,
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
user = pd.user
for attr in default_value:
# Get assigned attribute if we specific, else get default value
except_attr = (
user_attrs[attr] if attr in user_attrs else default_value[attr]
)
# Get actually attribute of user object
actual_attr = getattr(user, attr)
assert (
except_attr == actual_attr
), f"Except attribute is {except_attr} but get {actual_attr}"
def test_schedule_json_none_schedule():
"""Test function schedule_json with None as schedule."""
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=None,
) as pd:
assert pd.schedule_json is None
# We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
# not freeze time, it will cause flaky test here.
@freeze_time("2021-01-01")
@pytest.mark.parametrize(
"start_time,end_time,expect_date",
[
(
"20210101",
"20210201",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
(
"2021-01-01",
"2021-02-01",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
(
"2021/01/01",
"2021/02/01",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
# Test mix pattern
(
"2021/01/01 01:01:01",
"2021-02-02 02:02:02",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
(
"2021/01/01 01:01:01",
"20210202 020202",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
(
"20210101 010101",
"2021-02-02 02:02:02",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
# Test None value
(
"2021/01/01 01:02:03",
None,
{"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
),
(
None,
None,
{
"start_time": conv_to_schedule(datetime(2021, 1, 1)),
"end_time": "9999-12-31 23:59:59",
},
),
],
)
def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
"""Test function schedule_json about handle start_time and end_time.
Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
"""
schedule = "0 0 0 * * ? *"
expect = {
"crontab": schedule,
"startTime": expect_date["start_time"],
"endTime": expect_date["end_time"],
"timezoneId": ProcessDefinitionDefault.TIME_ZONE,
}
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=schedule,
start_time=start_time,
end_time=end_time,
timezone=ProcessDefinitionDefault.TIME_ZONE,
) as pd:
assert pd.schedule_json == expect