Browse Source

[python] Add config mechanism and cli subcommand config (#8585)

* [python] Add config mechanism and cli subcommand config

* Add configuration.py mechanism for more easy change config
  and move some configs to it. It mechanism including
  configuration.py module and default_config.yaml file
* Add `config` for cli subcommand allow users initialize, get,
  set configs

close: #8344

* Change setup.py format
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
ec4ce2b573
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-python/pydolphinscheduler/setup.py
  2. 44
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
  3. 20
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  4. 4
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
  5. 152
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
  6. 43
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
  7. 27
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  8. 4
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  9. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dependent_example.py
  10. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py
  11. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
  12. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
  13. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
  14. 8
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
  15. 85
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/path_dict.py
  16. 201
      dolphinscheduler-python/pydolphinscheduler/tests/cli/test_config.py
  17. 2
      dolphinscheduler-python/pydolphinscheduler/tests/cli/test_version.py
  18. 45
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
  19. 39
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_default_config_yaml.py
  20. 42
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  21. 10
      dolphinscheduler-python/pydolphinscheduler/tests/testing/cli.py
  22. 8
      dolphinscheduler-python/pydolphinscheduler/tests/testing/constants.py
  23. 13
      dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
  24. 201
      dolphinscheduler-python/pydolphinscheduler/tests/utils/test_path_dict.py

1
dolphinscheduler-python/pydolphinscheduler/setup.py

@ -38,6 +38,7 @@ version = "2.0.4"
prod = [
"click>=8.0.0",
"py4j~=0.10",
"pyyaml",
]
build = [

44
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py

@ -21,6 +21,11 @@ import click
from click import echo
from pydolphinscheduler import __version__
from pydolphinscheduler.core.configuration import (
get_single_config,
init_config_file,
set_single_config,
)
version_option_val = ["major", "minor", "micro"]
@ -46,3 +51,42 @@ def version(part: str) -> None:
echo(f"{__version__.split('.')[idx]}")
else:
echo(f"{__version__}")
@cli.command()
@click.option(
"--init",
"-i",
is_flag=True,
help="Initialize and create configuration file to `PYDOLPHINSCHEDULER_HOME`.",
)
@click.option(
"--set",
"-s",
"setter",
multiple=True,
type=click.Tuple([str, str]),
help="Set specific setting to config file."
"Use multiple ``--set <KEY> <VAL>`` options to set multiple configs",
)
@click.option(
"--get",
"-g",
"getter",
multiple=True,
type=str,
help="Get specific setting from config file."
"Use multiple ``--get <KEY>`` options to get multiple configs",
)
def config(getter, setter, init) -> None:
"""Manage the configuration for pydolphinscheduler."""
if init:
init_config_file()
elif getter:
click.echo("The configuration query as below:\n")
configs_kv = [f"{key} = {get_single_config(key)}" for key in getter]
click.echo("\n".join(configs_kv))
elif setter:
for key, val in setter:
set_single_config(key, val)
click.echo("Set configuration done.")

20
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -25,22 +25,6 @@ class ProcessDefinitionReleaseState:
OFFLINE: str = "OFFLINE"
class ProcessDefinitionDefault:
"""Constants default value for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition`."""
PROJECT: str = "project-pydolphin"
TENANT: str = "tenant_pydolphin"
USER: str = "userPythonGateway"
# TODO simple set password same as username
USER_PWD: str = "userPythonGateway"
USER_EMAIL: str = "userPythonGateway@dolphinscheduler.com"
USER_PHONE: str = "11111111111"
USER_STATE: int = 1
QUEUE: str = "queuePythonGateway"
WORKER_GROUP: str = "default"
TIME_ZONE: str = "Asia/Shanghai"
class TaskPriority(str):
"""Constants for task priority."""
@ -99,10 +83,6 @@ class JavaGatewayDefault(str):
RESULT_DATA = "data"
SERVER_ADDRESS = "127.0.0.1"
SERVER_PORT = 25333
AUTO_CONVERT = True
class Delimiter(str):
"""Constants for delimiter."""

4
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py

@ -19,7 +19,7 @@
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
@ -34,7 +34,7 @@ class BaseSide(Base):
cls,
# TODO comment for avoiding cycle import
# user: Optional[User] = ProcessDefinitionDefault.USER
user=ProcessDefinitionDefault.USER,
user=configuration.WORKFLOW_USER,
):
"""Create Base if not exists."""
raise NotImplementedError

152
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py

@ -0,0 +1,152 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Configuration module for pydolphinscheduler."""
import copy
import os
from pathlib import Path
from typing import Any, Dict
import yaml
from pydolphinscheduler.exceptions import PyDSConfException, PyDSParamException
from pydolphinscheduler.utils.path_dict import PathDict
DEFAULT_CONFIG_PATH = Path(__file__).resolve().parent.joinpath("default_config.yaml")
def get_config_file_path() -> Path:
"""Get the path of pydolphinscheduler configuration file."""
pyds_home = os.environ.get("PYDOLPHINSCHEDULER_HOME", "~/pydolphinscheduler")
config_file_path = Path(pyds_home).joinpath("config.yaml").expanduser()
return config_file_path
def read_yaml(path: str) -> Dict:
"""Read configs dict from configuration file.
:param path: The path of configuration file.
"""
with open(path, "r") as f:
return yaml.safe_load(f)
def write_yaml(context: Dict, path: str) -> None:
"""Write configs dict to configuration file.
:param context: The configs dict write to configuration file.
:param path: The path of configuration file.
"""
parent = Path(path).parent
if not parent.exists():
parent.mkdir(parents=True)
with open(path, mode="w") as f:
f.write(yaml.dump(context))
def default_yaml_config() -> Dict:
"""Get default configs in ``DEFAULT_CONFIG_PATH``."""
with open(DEFAULT_CONFIG_PATH, "r") as f:
return yaml.safe_load(f)
def _whether_exists_config() -> bool:
"""Check whether config file already exists in :func:`get_config_file_path`."""
return True if get_config_file_path().exists() else False
def get_all_configs():
"""Get all configs from configuration file."""
exists = _whether_exists_config()
if exists:
return read_yaml(str(get_config_file_path()))
else:
return default_yaml_config()
# Add configs as module variables to avoid read configuration multiple times when
# Get common configuration setting
# Set or get multiple configs in single time
configs = get_all_configs()
def init_config_file() -> None:
"""Initialize configuration file to :func:`get_config_file_path`."""
if _whether_exists_config():
raise PyDSConfException(
"Initialize configuration false to avoid overwrite configure by accident, file already exists "
"in %s, if you wan to overwrite the exists configure please remove the exists file manually.",
str(get_config_file_path()),
)
write_yaml(context=default_yaml_config(), path=str(get_config_file_path()))
def get_single_config(key: str) -> Any:
"""Get single config to configuration file.
:param key: The config path want get.
"""
global configs
config_path_dict = PathDict(configs)
if key not in config_path_dict:
raise PyDSParamException(
"Configuration path %s do not exists. Can not get configuration.", key
)
return config_path_dict.__getattr__(key)
def set_single_config(key: str, value: Any) -> None:
"""Change single config to configuration file.
:param key: The config path want change.
:param value: The new value want to set.
"""
global configs
config_path_dict = PathDict(configs)
if key not in config_path_dict:
raise PyDSParamException(
"Configuration path %s do not exists. Can not set configuration.", key
)
config_path_dict.__setattr__(key, value)
write_yaml(context=dict(config_path_dict), path=str(get_config_file_path()))
# Start Common Configuration Settings
path_configs = PathDict(copy.deepcopy(configs))
# Java Gateway Settings
JAVA_GATEWAY_ADDRESS = str(getattr(path_configs, "java_gateway.address"))
JAVA_GATEWAY_PORT = str(getattr(path_configs, "java_gateway.port"))
JAVA_GATEWAY_AUTO_CONVERT = str(getattr(path_configs, "java_gateway.auto_convert"))
# User Settings
USER_NAME = str(getattr(path_configs, "default.user.name"))
USER_PASSWORD = str(getattr(path_configs, "default.user.password"))
USER_EMAIL = str(getattr(path_configs, "default.user.email"))
USER_PHONE = str(getattr(path_configs, "default.user.phone"))
USER_STATE = str(getattr(path_configs, "default.user.state"))
# Workflow Settings
WORKFLOW_PROJECT = str(getattr(path_configs, "default.workflow.project"))
WORKFLOW_TENANT = str(getattr(path_configs, "default.workflow.tenant"))
WORKFLOW_USER = str(getattr(path_configs, "default.workflow.user"))
WORKFLOW_QUEUE = str(getattr(path_configs, "default.workflow.queue"))
WORKFLOW_WORKER_GROUP = str(getattr(path_configs, "default.workflow.worker_group"))
WORKFLOW_TIME_ZONE = str(getattr(path_configs, "default.workflow.time_zone"))
# End Common Configuration Setting

43
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
java_gateway:
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
address: 127.0.0.1
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
port: 25333
auto_convert: true
default:
user:
name: userPythonGateway
# TODO simple set password same as username
password: userPythonGateway
email: userPythonGateway@dolphinscheduler.com
tenant: tenant_pydolphin
phone: 11111111111
state: 1
workflow:
project: project-pydolphin
tenant: tenant_pydolphin
user: userPythonGateway
queue: queuePythonGateway
worker_group: default
time_zone: Asia/Shanghai

27
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -21,11 +21,8 @@ import json
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler.constants import (
ProcessDefinitionDefault,
ProcessDefinitionReleaseState,
TaskType,
)
from pydolphinscheduler.constants import ProcessDefinitionReleaseState, TaskType
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import launch_gateway
@ -90,12 +87,12 @@ class ProcessDefinition(Base):
schedule: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE,
user: Optional[str] = ProcessDefinitionDefault.USER,
project: Optional[str] = ProcessDefinitionDefault.PROJECT,
tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
user: Optional[str] = configuration.WORKFLOW_USER,
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
queue: Optional[str] = configuration.WORKFLOW_QUEUE,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
param: Optional[Dict] = None,
@ -153,12 +150,12 @@ class ProcessDefinition(Base):
"""
return User(
self._user,
ProcessDefinitionDefault.USER_PWD,
ProcessDefinitionDefault.USER_EMAIL,
ProcessDefinitionDefault.USER_PHONE,
configuration.USER_PASSWORD,
configuration.USER_EMAIL,
configuration.USER_PHONE,
self._tenant,
self._queue,
ProcessDefinitionDefault.USER_STATE,
configuration.USER_STATE,
)
@staticmethod

4
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -22,11 +22,11 @@ from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler.constants import (
Delimiter,
ProcessDefinitionDefault,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
)
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import (
ProcessDefinition,
@ -104,7 +104,7 @@ class Task(Base):
description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dependent_example.py

@ -35,7 +35,7 @@ task_dependent:
task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell
@ -58,12 +58,12 @@ with ProcessDefinition(
dependence=And(
Or(
DependentItem(
project_name=ProcessDefinitionDefault.PROJECT,
project_name=configuration.WORKFLOW_PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_1",
),
DependentItem(
project_name=ProcessDefinitionDefault.PROJECT,
project_name=configuration.WORKFLOW_PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_2",
),

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py

@ -44,3 +44,9 @@ class PyDSJavaGatewayException(PyDSBaseException):
class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
"""Exception for pydolphinscheduler process definition not assign error."""
class PyDSConfException(PyDSBaseException):
"""Exception for pydolphinscheduler configuration error."""
pass

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py

@ -23,6 +23,7 @@ from py4j.java_collections import JavaMap
from py4j.java_gateway import GatewayParameters, JavaGateway
from pydolphinscheduler.constants import JavaGatewayDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.exceptions import PyDSJavaGatewayException
@ -38,9 +39,9 @@ def launch_gateway(
This is why automatic conversion is disabled by default.
"""
gateway_parameters = GatewayParameters(
address=address or JavaGatewayDefault.SERVER_ADDRESS,
port=port or JavaGatewayDefault.SERVER_PORT,
auto_convert=auto_convert or JavaGatewayDefault.AUTO_CONVERT,
address=address or configuration.JAVA_GATEWAY_ADDRESS,
port=port or configuration.JAVA_GATEWAY_PORT,
auto_convert=auto_convert or configuration.JAVA_GATEWAY_AUTO_CONVERT,
)
gateway = JavaGateway(gateway_parameters=gateway_parameters)
return gateway

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py

@ -19,7 +19,7 @@
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
@ -29,12 +29,12 @@ class Project(BaseSide):
def __init__(
self,
name: str = ProcessDefinitionDefault.PROJECT,
name: str = configuration.WORKFLOW_PROJECT,
description: Optional[str] = None,
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Project if not exists."""
gateway = launch_gateway()
gateway.entry_point.createProject(user, self.name, self.description)

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py

@ -19,7 +19,7 @@
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
@ -29,12 +29,12 @@ class Queue(BaseSide):
def __init__(
self,
name: str = ProcessDefinitionDefault.QUEUE,
name: str = configuration.WORKFLOW_QUEUE,
description: Optional[str] = "",
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Queue if not exists."""
gateway = launch_gateway()
# Here we set Queue.name and Queue.queueName same as self.name

8
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py

@ -19,7 +19,7 @@
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
@ -29,15 +29,15 @@ class Tenant(BaseSide):
def __init__(
self,
name: str = ProcessDefinitionDefault.TENANT,
queue: str = ProcessDefinitionDefault.QUEUE,
name: str = configuration.WORKFLOW_TENANT,
queue: str = configuration.WORKFLOW_QUEUE,
description: Optional[str] = None,
):
super().__init__(name, description)
self.queue = queue
def create_if_not_exists(
self, queue_name: str, user=ProcessDefinitionDefault.USER
self, queue_name: str, user=configuration.USER_NAME
) -> None:
"""Create Tenant if not exists."""
gateway = launch_gateway()

85
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/path_dict.py

@ -0,0 +1,85 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Path dict allow users access value by key chain, like `var.key1.key2.key3`."""
# TODO maybe we should rewrite it by `collections.abc.MutableMapping` later,
# according to https://stackoverflow.com/q/3387691/7152658
class PathDict(dict):
"""Path dict allow users access value by key chain, like `var.key1.key2.key3`."""
def __init__(self, original=None):
super().__init__()
if original is None:
pass
elif isinstance(original, dict):
for key in original:
self.__setitem__(key, original[key])
else:
raise TypeError(
"Parameter original expected dict type but get %s", type(original)
)
def __getitem__(self, key):
if "." not in key:
# try:
return dict.__getitem__(self, key)
# except KeyError:
# # cPickle would get error when key without value pairs, in this case we just skip it
# return
my_key, rest_of_key = key.split(".", 1)
target = dict.__getitem__(self, my_key)
if not isinstance(target, PathDict):
raise KeyError(
'Cannot get "%s" to (%s) as sub-key of "%s".'
% (rest_of_key, repr(target), my_key)
)
return target[rest_of_key]
def __setitem__(self, key, value):
if "." in key:
my_key, rest_of_key = key.split(".", 1)
target = self.setdefault(my_key, PathDict())
if not isinstance(target, PathDict):
raise KeyError(
'Cannot set "%s" from (%s) as sub-key of "%s"'
% (rest_of_key, repr(target), my_key)
)
target[rest_of_key] = value
else:
if isinstance(value, dict) and not isinstance(value, PathDict):
value = PathDict(value)
dict.__setitem__(self, key, value)
def __contains__(self, key):
if "." not in key:
return dict.__contains__(self, key)
my_key, rest_of_key = key.split(".", 1)
target = dict.__getitem__(self, my_key)
if not isinstance(target, PathDict):
return False
return rest_of_key in target
def setdefault(self, key, default):
"""Overwrite method dict.setdefault."""
if key not in self:
self[key] = default
return self[key]
__setattr__ = __setitem__
__getattr__ = __getitem__

201
dolphinscheduler-python/pydolphinscheduler/tests/cli/test_config.py

@ -0,0 +1,201 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test command line interface subcommand `config`."""
import os
from pathlib import Path
import pytest
from pydolphinscheduler.cli.commands import cli
from pydolphinscheduler.core.configuration import get_config_file_path
from tests.testing.cli import CliTestWrapper
from tests.testing.constants import DEV_MODE
default_config_path = "~/pydolphinscheduler"
config_file = "config.yaml"
@pytest.fixture
def delete_tmp_config_file():
"""Util for deleting temp configuration file after test finish."""
yield
config_file_path = get_config_file_path()
config_file_path.unlink()
@pytest.mark.skipif(
DEV_MODE,
reason="Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally.",
)
@pytest.mark.parametrize(
"home",
[
None,
"/tmp/pydolphinscheduler",
"/tmp/test_abc",
],
)
def test_config_init(delete_tmp_config_file, home):
"""Test command line interface `config --init`."""
if home:
os.environ["PYDOLPHINSCHEDULER_HOME"] = home
config_path = home
else:
config_path = default_config_path
path = Path(config_path).joinpath(config_file).expanduser()
assert not path.exists()
cli_test = CliTestWrapper(cli, ["config", "--init"])
cli_test.assert_success()
assert path.exists()
# TODO We have a bug here, yaml dump do not support comment
# with path.open(mode="r") as cli_crt, open(path_default_config_yaml, "r") as src:
# assert src.read() == cli_crt.read()
@pytest.mark.parametrize(
"key, expect",
[
# We test each key in one single section
("java_gateway.address", "127.0.0.1"),
("default.user.name", "userPythonGateway"),
("default.workflow.project", "project-pydolphin"),
],
)
def test_config_get(delete_tmp_config_file, key: str, expect: str):
"""Test command line interface `config --get XXX`."""
os.environ["PYDOLPHINSCHEDULER_HOME"] = "/tmp/pydolphinscheduler"
cli_test = CliTestWrapper(cli, ["config", "--init"])
cli_test.assert_success()
cli_test = CliTestWrapper(cli, ["config", "--get", key])
cli_test.assert_success(output=f"{key} = {expect}", fuzzy=True)
@pytest.mark.parametrize(
"keys, expects",
[
# We test mix section keys
(("java_gateway.address", "java_gateway.port"), ("127.0.0.1", "25333")),
(
("java_gateway.auto_convert", "default.user.tenant"),
("True", "tenant_pydolphin"),
),
(
(
"java_gateway.port",
"default.user.state",
"default.workflow.worker_group",
),
("25333", "1", "default"),
),
],
)
def test_config_get_multiple(delete_tmp_config_file, keys: str, expects: str):
"""Test command line interface `config --get KEY1 --get KEY2 ...`."""
os.environ["PYDOLPHINSCHEDULER_HOME"] = "/tmp/pydolphinscheduler"
cli_test = CliTestWrapper(cli, ["config", "--init"])
cli_test.assert_success()
get_args = ["config"]
for key in keys:
get_args.append("--get")
get_args.append(key)
cli_test = CliTestWrapper(cli, get_args)
for idx, expect in enumerate(expects):
cli_test.assert_success(output=f"{keys[idx]} = {expect}", fuzzy=True)
# TODO fix command `config --set KEY VAL`
@pytest.mark.skip(reason="We still have bug in command `config --set KEY VAL`")
@pytest.mark.parametrize(
"key, value",
[
# We test each key in one single section
("java_gateway.address", "127.1.1.1"),
("default.user.name", "editUserPythonGateway"),
("default.workflow.project", "edit-project-pydolphin"),
],
)
def test_config_set(delete_tmp_config_file, key: str, value: str):
"""Test command line interface `config --set KEY VALUE`."""
os.environ["PYDOLPHINSCHEDULER_HOME"] = "/tmp/pydolphinscheduler"
cli_test = CliTestWrapper(cli, ["config", "--init"])
cli_test.assert_success()
# Make sure value do not exists first
cli_test = CliTestWrapper(cli, ["config", "--get", key])
assert f"{key} = {value}" not in cli_test.result.output
cli_test = CliTestWrapper(cli, ["config", "--set", key, value])
cli_test.assert_success()
cli_test = CliTestWrapper(cli, ["config", "--get", key])
assert f"{key} = {value}" in cli_test.result.output
# TODO do not skip `config --set KEY1 VAL1 --set KEY2 VAL2`
@pytest.mark.skip(
reason="We still have bug in command `config --set KEY1 VAL1 --set KEY2 VAL2`"
)
@pytest.mark.parametrize(
"keys, values",
[
# We test each key in mixture section
(("java_gateway.address", "java_gateway.port"), ("127.1.1.1", "25444")),
(
("java_gateway.auto_convert", "default.user.tenant"),
("False", "edit_tenant_pydolphin"),
),
(
(
"java_gateway.port",
"default.user.state",
"default.workflow.worker_group",
),
("25555", "0", "not-default"),
),
],
)
def test_config_set_multiple(delete_tmp_config_file, keys: str, values: str):
"""Test command line interface `config --set KEY1 VAL1 --set KEY2 VAL2`."""
os.environ["PYDOLPHINSCHEDULER_HOME"] = "/tmp/pydolphinscheduler"
cli_test = CliTestWrapper(cli, ["config", "--init"])
cli_test.assert_success()
set_args = ["config"]
for idx, key in enumerate(keys):
# Make sure values do not exists first
cli_test = CliTestWrapper(cli, ["config", "--get", key])
assert f"{key} = {values[idx]}" not in cli_test.result.output
set_args.append("--set")
set_args.append(key)
set_args.append(values[idx])
cli_test = CliTestWrapper(cli, set_args)
cli_test.assert_success()
for idx, key in enumerate(keys):
# Make sure values exists after `config --set` run
cli_test = CliTestWrapper(cli, ["config", "--get", key])
assert f"{key} = {values[idx]}" in cli_test.result.output

2
dolphinscheduler-python/pydolphinscheduler/tests/cli/test_version.py

@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""Test command line interface subcommand version."""
"""Test command line interface subcommand `version`."""
import pytest

45
dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test class :mod:`pydolphinscheduler.core.configuration`' method."""
import os
from pathlib import Path
import pytest
from pydolphinscheduler.core import configuration
@pytest.mark.parametrize(
"env, expect",
[
(None, "~/pydolphinscheduler"),
("/tmp/pydolphinscheduler", "/tmp/pydolphinscheduler"),
("/tmp/test_abc", "/tmp/test_abc"),
],
)
def test_get_config_file_path(env, expect):
"""Test get config file path method."""
# Avoid env setting by other tests
os.environ.pop("PYDOLPHINSCHEDULER_HOME", None)
if env:
os.environ["PYDOLPHINSCHEDULER_HOME"] = env
assert (
Path(expect).joinpath("config.yaml").expanduser()
== configuration.get_config_file_path()
)

39
dolphinscheduler-python/pydolphinscheduler/tests/core/test_default_config_yaml.py

@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test default config file."""
from typing import Dict
import yaml
from tests.testing.path import path_default_config_yaml
def nested_key_check(test_dict: Dict) -> None:
"""Test whether default configuration file exists specific character."""
for key, val in test_dict.items():
assert "." not in key, f"There is not allowed special character in key `{key}`."
if isinstance(val, dict):
nested_key_check(val)
def test_key_without_dot_delimiter():
"""Test wrapper of whether default configuration file exists specific character."""
with open(path_default_config_yaml, "r") as f:
default_config = yaml.safe_load(f)
nested_key_check(default_config)

42
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -24,10 +24,8 @@ from unittest.mock import patch
import pytest
from freezegun import freeze_time
from pydolphinscheduler.constants import (
ProcessDefinitionDefault,
ProcessDefinitionReleaseState,
)
from pydolphinscheduler.constants import ProcessDefinitionReleaseState
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
@ -51,22 +49,22 @@ def test_process_definition_key_attr(func):
@pytest.mark.parametrize(
"name,value",
[
("timezone", ProcessDefinitionDefault.TIME_ZONE),
("project", Project(ProcessDefinitionDefault.PROJECT)),
("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
("timezone", configuration.WORKFLOW_TIME_ZONE),
("project", Project(configuration.WORKFLOW_PROJECT)),
("tenant", Tenant(configuration.WORKFLOW_TENANT)),
(
"user",
User(
ProcessDefinitionDefault.USER,
ProcessDefinitionDefault.USER_PWD,
ProcessDefinitionDefault.USER_EMAIL,
ProcessDefinitionDefault.USER_PHONE,
ProcessDefinitionDefault.TENANT,
ProcessDefinitionDefault.QUEUE,
ProcessDefinitionDefault.USER_STATE,
configuration.USER_NAME,
configuration.USER_PASSWORD,
configuration.USER_EMAIL,
configuration.USER_PHONE,
configuration.WORKFLOW_TENANT,
configuration.WORKFLOW_QUEUE,
configuration.USER_STATE,
),
),
("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
@ -233,9 +231,9 @@ def test_process_definition_get_define_without_task():
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
"project": ProcessDefinitionDefault.PROJECT,
"tenant": ProcessDefinitionDefault.TENANT,
"workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
"project": configuration.WORKFLOW_PROJECT,
"tenant": configuration.WORKFLOW_TENANT,
"workerGroup": configuration.WORKFLOW_WORKER_GROUP,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None,
@ -318,8 +316,8 @@ def test_process_definition_simple_separate():
def test_set_process_definition_user_attr(user_attrs):
"""Test user with correct attributes if we specific assigned to process definition object."""
default_value = {
"tenant": ProcessDefinitionDefault.TENANT,
"queue": ProcessDefinitionDefault.QUEUE,
"tenant": configuration.WORKFLOW_TENANT,
"queue": configuration.WORKFLOW_QUEUE,
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
user = pd.user
@ -407,13 +405,13 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
"crontab": schedule,
"startTime": expect_date["start_time"],
"endTime": expect_date["end_time"],
"timezoneId": ProcessDefinitionDefault.TIME_ZONE,
"timezoneId": configuration.WORKFLOW_TIME_ZONE,
}
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=schedule,
start_time=start_time,
end_time=end_time,
timezone=ProcessDefinitionDefault.TIME_ZONE,
timezone=configuration.WORKFLOW_TIME_ZONE,
) as pd:
assert pd.schedule_json == expect

10
dolphinscheduler-python/pydolphinscheduler/tests/testing/cli.py

@ -18,17 +18,14 @@
"""Utils of command line test."""
import os
from click.testing import CliRunner
from tests.testing.constants import DEV_MODE
class CliTestWrapper:
"""Wrap command click CliRunner.invoke."""
_dev_mode_env_name = "PY_DOLPHINSCHEDULER_DEV_MODE"
_dev_mode_true_val = {"true", "t", "1"}
def __init__(self, *args, **kwargs):
runner = CliRunner()
self.result = runner.invoke(*args, **kwargs)
@ -55,8 +52,7 @@ class CliTestWrapper:
It read variable named `PY_DOLPHINSCHEDULER_DEV_MODE` from env, when it set to `true` or `t` or `1`
will print result output when class :class:`CliTestWrapper` is initialization.
"""
dev_mode = str(os.getenv(self._dev_mode_env_name))
if dev_mode.strip().lower() in self._dev_mode_true_val:
if DEV_MODE:
print(f"\n{self.result.output}\n")
def assert_success(self, output: str = None, fuzzy: bool = False):

8
dolphinscheduler-python/pydolphinscheduler/tests/testing/constants.py

@ -17,6 +17,8 @@
"""Constants variables for test module."""
import os
# Record some task without example in directory `example`. Some of them maybe can not write example,
# but most of them just without adding by mistake, and we should add it later.
task_without_example = {
@ -26,3 +28,9 @@ task_without_example = {
"python",
"procedure",
}
# whether in dev mode, if true we will add or remove some tests. Or make be and more detail infos when
# test failed.
DEV_MODE = str(
os.environ.get("PY_DOLPHINSCHEDULER_DEV_MODE", False)
).strip().lower() in {"true", "t", "1"}

13
dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py

@ -20,13 +20,14 @@
from pathlib import Path
from typing import Any, Generator
path_code_tasks = Path(__file__).parent.parent.parent.joinpath(
"src", "pydolphinscheduler", "tasks"
)
path_example = Path(__file__).parent.parent.parent.joinpath(
"src", "pydolphinscheduler", "examples"
project_root = Path(__file__).parent.parent.parent
path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
path_default_config_yaml = project_root.joinpath(
"src", "pydolphinscheduler", "core", "default_config.yaml"
)
path_doc_tasks = Path(__file__).parent.parent.parent.joinpath("docs", "source", "tasks")
def get_all_examples() -> Generator[Path, Any, None]:

201
dolphinscheduler-python/pydolphinscheduler/tests/utils/test_path_dict.py

@ -0,0 +1,201 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test utils.path_dict module."""
import copy
from typing import Dict, Tuple
import pytest
from pydolphinscheduler.utils.path_dict import PathDict
src_dict_list = [
# dict with one single level
{"a": 1},
# dict with two levels, with same nested keys 'b'
{"a": 1, "b": 2, "c": {"d": 3}, "e": {"b": 4}},
# dict with three levels, with same nested keys 'b'
{"a": 1, "b": 2, "c": {"d": 3}, "e": {"b": {"b": 4}, "f": 5}},
# dict with specific key container
{
"a": 1,
"a-b": 2,
},
]
@pytest.mark.parametrize("org", src_dict_list)
def test_val_between_dict_and_path_dict(org: Dict):
"""Test path dict equal to original dict."""
path_dict = PathDict(org)
assert org == dict(path_dict)
def test_path_dict_basic_attr_access():
"""Test basic behavior of path dict.
Including add by attribute, with simple, nested dict, and specific key dict.
"""
expect = copy.deepcopy(src_dict_list[2])
path_dict = PathDict(expect)
# Add node with one level
val = 3
path_dict.f = val
expect.update({"f": val})
assert expect == path_dict
# Add node with multiple level
val = {"abc": 123}
path_dict.e.g = val
expect.update({"e": {"b": {"b": 4}, "f": 5, "g": val}})
assert expect == path_dict
# Specific key
expect = copy.deepcopy(src_dict_list[3])
path_dict = PathDict(expect)
assert 1 == path_dict.a
assert 2 == getattr(path_dict, "a-b")
@pytest.mark.parametrize(
"org, exists, not_exists",
[
(
src_dict_list[0],
("a"),
("b", "a.b"),
),
(
src_dict_list[1],
("a", "b", "c", "e", "c.d", "e.b"),
("a.b", "c.e", "b.c", "b.e"),
),
(
src_dict_list[2],
("a", "b", "c", "e", "c.d", "e.b", "e.b.b", "e.b.b", "e.f"),
("a.b", "c.e", "b.c", "b.e", "b.b.f", "b.f"),
),
],
)
def test_path_dict_attr(org: Dict, exists: Tuple, not_exists: Tuple):
"""Test properties' integrity of path dict."""
path_dict = PathDict(org)
assert all([hasattr(path_dict, path) for path in exists])
# assert not any([hasattr(path_dict, path) for path in not_exists])
@pytest.mark.parametrize(
"org, path_get",
[
(
src_dict_list[0],
{"a": 1},
),
(
src_dict_list[1],
{
"a": 1,
"b": 2,
"c": {"d": 3},
"c.d": 3,
"e": {"b": 4},
"e.b": 4,
},
),
(
src_dict_list[2],
{
"a": 1,
"b": 2,
"c": {"d": 3},
"c.d": 3,
"e": {"b": {"b": 4}, "f": 5},
"e.b": {"b": 4},
"e.b.b": 4,
"e.f": 5,
},
),
],
)
def test_path_dict_get(org: Dict, path_get: Dict):
"""Test path dict getter function."""
path_dict = PathDict(org)
assert all([path_get[path] == path_dict.__getattr__(path) for path in path_get])
@pytest.mark.parametrize(
"org, path_set, expect",
[
# Add not exists node
(
src_dict_list[0],
{"b": 2},
{
"a": 1,
"b": 2,
},
),
# Overwrite exists node with different type of value
(
src_dict_list[0],
{"a": "b"},
{"a": "b"},
),
# Add multiple not exists node with variable types of value
(
src_dict_list[0],
{
"b.c.d": 123,
"b.c.e": "a",
"b.f": {"g": 23, "h": "bc", "i": {"j": "k"}},
},
{
"a": 1,
"b": {
"c": {
"d": 123,
"e": "a",
},
"f": {"g": 23, "h": "bc", "i": {"j": "k"}},
},
},
),
# Test complex original data
(
src_dict_list[2],
{
"g": 12,
"c.h": 34,
},
{
"a": 1,
"b": 2,
"g": 12,
"c": {"d": 3, "h": 34},
"e": {"b": {"b": 4}, "f": 5},
},
),
],
)
def test_path_dict_set(org: Dict, path_set: Dict, expect: Dict):
"""Test path dict setter function."""
path_dict = PathDict(org)
for path in path_set:
path_dict.__setattr__(path, path_set[path])
assert expect == path_dict
Loading…
Cancel
Save