|
|
|
@ -145,3 +145,31 @@ def test_process_definition_simple():
|
|
|
|
|
assert task._downstream_task_codes == { |
|
|
|
|
pd.get_one_task_by_name(f"task-{i + 1}").code |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize( |
|
|
|
|
"user_attrs", |
|
|
|
|
[ |
|
|
|
|
{"tenant": "tenant_specific"}, |
|
|
|
|
{"queue": "queue_specific"}, |
|
|
|
|
{"tenant": "tenant_specific", "queue": "queue_specific"}, |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
def test_set_process_definition_user_attr(user_attrs): |
|
|
|
|
"""Test user with correct attributes if we specific assigned to process definition object.""" |
|
|
|
|
default_value = { |
|
|
|
|
"tenant": ProcessDefinitionDefault.TENANT, |
|
|
|
|
"queue": ProcessDefinitionDefault.QUEUE, |
|
|
|
|
} |
|
|
|
|
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd: |
|
|
|
|
user = pd.user |
|
|
|
|
for attr in default_value: |
|
|
|
|
# Get assigned attribute if we specific, else get default value |
|
|
|
|
except_attr = ( |
|
|
|
|
user_attrs[attr] if attr in user_attrs else default_value[attr] |
|
|
|
|
) |
|
|
|
|
# Get actually attribute of user object |
|
|
|
|
actual_attr = getattr(user, attr) |
|
|
|
|
assert ( |
|
|
|
|
except_attr == actual_attr |
|
|
|
|
), f"Except attribute is {except_attr} but get {actual_attr}" |
|
|
|
|