Browse Source

[python] Add custom log for meanful an easier log formatter (#8901)

* Change logging.warning to logger.warning
* Add testing for logging about task code already in process definition

close: #8258
3.0.0/version-upgrade
JieguangZhou 3 years ago committed by GitHub
parent
commit
32a5ccac72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  2. 21
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  3. 15
      dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -17,7 +17,7 @@
"""DolphinScheduler Task and TaskRelation object.""" """DolphinScheduler Task and TaskRelation object."""
import logging from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler.constants import ( from pydolphinscheduler.constants import (
@ -34,6 +34,8 @@ from pydolphinscheduler.core.process_definition import (
) )
from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.java_gateway import launch_gateway
logger = getLogger(__name__)
class TaskRelation(Base): class TaskRelation(Base):
"""TaskRelation object, describe the relation of exactly two tasks.""" """TaskRelation object, describe the relation of exactly two tasks."""
@ -146,7 +148,7 @@ class Task(Base):
): ):
self.process_definition.add_task(self) self.process_definition.add_task(self)
else: else:
logging.warning( logger.warning(
"Task code %d already in process definition, prohibit re-add task.", "Task code %d already in process definition, prohibit re-add task.",
self.code, self.code,
) )

21
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -16,13 +16,16 @@
# under the License. # under the License.
"""Test Task class function.""" """Test Task class function."""
import logging
import re
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task, TaskRelation from pydolphinscheduler.core.task import Task, TaskRelation
from tests.testing.task import Task as testTask from tests.testing.task import Task as testTask
from tests.testing.task import TaskWithCode
TEST_TASK_RELATION_SET = set() TEST_TASK_RELATION_SET = set()
TEST_TASK_RELATION_SIZE = 0 TEST_TASK_RELATION_SIZE = 0
@ -222,3 +225,19 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks]) assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks]) assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])
def test_add_duplicate(caplog):
"""Test add task which code already in process definition."""
with ProcessDefinition("test_add_duplicate_workflow") as _:
TaskWithCode(name="test_task_1", task_type="test", code=123, version=1)
with caplog.at_level(logging.WARNING):
TaskWithCode(
name="test_task_duplicate_code", task_type="test", code=123, version=2
)
assert all(
[
caplog.text.startswith("WARNING pydolphinscheduler"),
re.findall("already in process definition", caplog.text),
]
)

15
dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py

@ -30,3 +30,18 @@ class Task(SourceTask):
def gen_code_and_version(self): def gen_code_and_version(self):
"""Mock java gateway code and version, convenience method for unittest.""" """Mock java gateway code and version, convenience method for unittest."""
return uuid.uuid1().time, self.DEFAULT_VERSION return uuid.uuid1().time, self.DEFAULT_VERSION
class TaskWithCode(SourceTask):
"""Mock class :class:`pydolphinscheduler.core.task.Task` and it return some code and version."""
def __init__(
self, name: str, task_type: str, code: int, version: int, *args, **kwargs
):
self._constant_code = code
self._constant_version = version
super().__init__(name, task_type, *args, **kwargs)
def gen_code_and_version(self):
"""Mock java gateway code and version, convenience method for unittest."""
return self._constant_code, self._constant_version

Loading…
Cancel
Save