Browse Source

[python] Add resource plugin for python, dataX, CustomDataX and Sql (#12135)

3.2.0-release
chenrj 2 years ago committed by GitHub
parent
commit
9652964c94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  2. 21
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  3. 10
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py
  4. 33
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
  5. 7
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
  6. 91
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
  7. 58
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
  8. 45
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -118,3 +118,5 @@ class Symbol(str):
SLASH = "/"
POINT = "."
COMMA = ","
UNDERLINE = "_"

21
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -17,6 +17,7 @@
"""DolphinScheduler Task and TaskRelation object."""
import copy
import types
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@ -24,6 +25,7 @@ from pydolphinscheduler import configuration
from pydolphinscheduler.constants import (
Delimiter,
ResourceKey,
Symbol,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
@ -114,7 +116,7 @@ class Task(Base):
_task_custom_attr: set = set()
ext: set = None
ext_attr: str = None
ext_attr: Union[str, types.FunctionType] = None
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
@ -271,23 +273,26 @@ class Task(Base):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return
_ext_attr = getattr(self, self.ext_attr)
if _ext_attr is not None:
if _ext_attr.endswith(tuple(self.ext)):
if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
setattr(self, self.ext_attr.lstrip("_"), content)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content)
else:
index = _ext_attr.rfind(".")
if self.resource_plugin is not None or (
self.process_definition is not None
and self.process_definition.resource_plugin is not None
):
index = _ext_attr.rfind(Symbol.POINT)
if index != -1:
raise ValueError(
"This task does not support files with suffix {}, only supports {}".format(
_ext_attr[index:], ",".join(str(suf) for suf in self.ext)
_ext_attr[index:],
Symbol.COMMA.join(str(suf) for suf in self.ext),
)
)
setattr(self, self.ext_attr.lstrip("_"), _ext_attr)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr)
def __hash__(self):
return hash(self.code)

10
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py

@ -34,6 +34,9 @@ class CustomDataX(Task):
_task_custom_attr = {"custom_config", "json", "xms", "xmx"}
ext: set = {".json"}
ext_attr: str = "_json"
def __init__(
self,
name: str,
@ -43,9 +46,9 @@ class CustomDataX(Task):
*args,
**kwargs
):
self._json = json
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.custom_config = self.CUSTOM_CONFIG
self.json = json
self.xms = xms
self.xmx = xmx
@ -76,6 +79,9 @@ class DataX(Task):
"xmx",
}
ext: set = {".sql"}
ext_attr: str = "_sql"
def __init__(
self,
name: str,
@ -92,8 +98,8 @@ class DataX(Task):
*args,
**kwargs
):
self._sql = sql
super().__init__(name, TaskType.DATAX, *args, **kwargs)
self.sql = sql
self.custom_config = self.CUSTOM_CONFIG
self.datasource_name = datasource_name
self.datatarget_name = datatarget_name

33
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py

@ -55,15 +55,16 @@ class Python(Task):
want to execute.
"""
_task_custom_attr = {
"raw_script",
}
_task_custom_attr = {"raw_script", "definition"}
ext: set = {".py"}
ext_attr: Union[str, types.FunctionType] = "_definition"
def __init__(
self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
):
self._definition = definition
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
self.definition = definition
def _build_exe_str(self) -> str:
"""Build executable string from given definition.
@ -71,32 +72,34 @@ class Python(Task):
Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
"""
if isinstance(self.definition, types.FunctionType):
py_function = inspect.getsource(self.definition)
func_str = f"{py_function}{self.definition.__name__}()"
definition = getattr(self, "definition")
if isinstance(definition, types.FunctionType):
py_function = inspect.getsource(definition)
func_str = f"{py_function}{definition.__name__}()"
else:
pattern = re.compile("^def (\\w+)\\(")
find = pattern.findall(self.definition)
find = pattern.findall(definition)
if not find:
log.warning(
"Python definition is simple script instead of function, with value %s",
self.definition,
definition,
)
return self.definition
return definition
# Keep function str and function callable always have one blank line
func_str = (
f"{self.definition}{find[0]}()"
if self.definition.endswith("\n")
else f"{self.definition}\n{find[0]}()"
f"{definition}{find[0]}()"
if definition.endswith("\n")
else f"{definition}\n{find[0]}()"
)
return func_str
@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
if isinstance(self.definition, (str, types.FunctionType)):
if isinstance(getattr(self, "definition"), (str, types.FunctionType)):
return self._build_exe_str()
else:
raise PyDSParamException(
"Parameter definition do not support % for now.", type(self.definition)
"Parameter definition do not support % for now.",
type(getattr(self, "definition")),
)

7
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py

@ -59,6 +59,9 @@ class Sql(Task):
"display_rows",
}
ext: set = {".sql"}
ext_attr: str = "_sql"
def __init__(
self,
name: str,
@ -71,8 +74,8 @@ class Sql(Task):
*args,
**kwargs
):
self._sql = sql
super().__init__(name, TaskType.SQL, *args, **kwargs)
self.sql = sql
self.param_sql_type = sql_type
self.datasource_name = datasource_name
self.pre_statements = pre_statements or []
@ -101,7 +104,7 @@ class Sql(Task):
"|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
if pattern_select.match(self.sql) is None:
if pattern_select.match(self._sql) is None:
return SqlType.NOT_SELECT
else:
return SqlType.SELECT

91
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py

@ -16,12 +16,28 @@
# under the License.
"""Test Task DataX."""
from pathlib import Path
from unittest.mock import patch
import pytest
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file
@pytest.fixture()
def setup_crt_first(request):
"""Set up and teardown about create file first and then delete it."""
file_content = request.param.get("file_content")
file_path = request.param.get("file_path")
file.write(
content=file_content,
to_path=file_path,
)
yield
delete_file(file_path)
@patch(
@ -122,3 +138,76 @@ def test_custom_datax_get_define(json_template):
):
task = CustomDataX(name, json_template)
assert task.get_define() == expect
@pytest.mark.parametrize(
"setup_crt_first",
[
{
"file_path": Path(__file__).parent.joinpath("local_res.sql"),
"file_content": "test local resource",
}
],
indirect=True,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "task_datax",
"datasource_name": "first_mysql",
"datatarget_name": "second_mysql",
"sql": "local_res.sql",
"target_table": "target_table",
"resource_plugin": Local(str(Path(__file__).parent)),
},
"test local resource",
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_datax_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task datax sql content through the local resource plug-in."""
datax = DataX(**attr)
assert expect == getattr(datax, "sql")
@pytest.mark.parametrize(
"setup_crt_first",
[
{
"file_path": Path(__file__).parent.joinpath("local_res.json"),
"file_content": '{content: "test local resource"}',
}
],
indirect=True,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "task_custom_datax",
"json": "local_res.json",
"resource_plugin": Local(str(Path(__file__).parent)),
},
'{content: "test local resource"}',
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_custom_datax_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task CustomDataX json content through the local resource plug-in."""
custom_datax = CustomDataX(**attr)
assert expect == getattr(custom_datax, "json")

58
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@ -16,26 +16,42 @@
# under the License.
"""Test Task python."""
from pathlib import Path
from unittest.mock import patch
import pytest
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.python import Python
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file
def foo(): # noqa: D103
print("hello world.")
@pytest.fixture()
def setup_crt_first(request):
"""Set up and teardown about create file first and then delete it."""
file_content = request.param.get("file_content")
file_path = request.param.get("file_path")
file.write(
content=file_content,
to_path=file_path,
)
yield
delete_file(file_path)
@pytest.mark.parametrize(
"attr, expect",
[
(
{"definition": "print(1)"},
{
"definition": "print(1)",
"rawScript": "print(1)",
"localParams": [],
"resourceList": [],
@ -47,6 +63,7 @@ def foo(): # noqa: D103
(
{"definition": "def foo():\n print('I am foo')"},
{
"definition": "def foo():\n print('I am foo')",
"rawScript": "def foo():\n print('I am foo')\nfoo()",
"localParams": [],
"resourceList": [],
@ -58,6 +75,7 @@ def foo(): # noqa: D103
(
{"definition": foo},
{
"definition": foo,
"rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
"localParams": [],
"resourceList": [],
@ -122,6 +140,7 @@ def test_python_get_define(name, script_code, raw):
"delayTime": 0,
"taskType": "PYTHON",
"taskParams": {
"definition": script_code,
"resourceList": [],
"localParams": [],
"rawScript": raw,
@ -145,3 +164,38 @@ def test_python_get_define(name, script_code, raw):
):
shell = Python(name, script_code)
assert shell.get_define() == expect
@pytest.mark.parametrize(
"setup_crt_first",
[
{
"file_path": Path(__file__).parent.joinpath("local_res.py"),
"file_content": "test local resource",
}
],
indirect=True,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "task_python",
"definition": "local_res.py",
"resource_plugin": Local(str(Path(__file__).parent)),
},
"test local resource",
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_python_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test task Python definition content through the local resource plug-in."""
python = Python(**attr)
assert expect == getattr(python, "definition")

45
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@ -16,13 +16,28 @@
# under the License.
"""Test Task Sql."""
from pathlib import Path
from unittest.mock import patch
import pytest
from pydolphinscheduler.resources_plugin import Local
from pydolphinscheduler.tasks.sql import Sql, SqlType
from pydolphinscheduler.utils import file
from tests.testing.file import delete_file
file_name = "local_res.sql"
file_content = "select 1"
res_plugin_prefix = Path(__file__).parent
file_path = res_plugin_prefix.joinpath(file_name)
@pytest.fixture
def setup_crt_first():
"""Set up and teardown about create file first and then delete it."""
file.write(content=file_content, to_path=file_path)
yield
delete_file(file_path)
@pytest.mark.parametrize(
@ -165,3 +180,29 @@ def test_sql_get_define(mock_datasource):
):
task = Sql(name, datasource_name, command)
assert task.get_define() == expect
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "test-sql-local-res",
"sql": file_name,
"datasource_name": "test_datasource",
"resource_plugin": Local(str(res_plugin_prefix)),
},
file_content,
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_resources_local_sql_command_content(
mock_code_version, attr, expect, setup_crt_first
):
"""Test sql content through the local resource plug-in."""
sql = Sql(**attr)
assert expect == getattr(sql, "sql")

Loading…
Cancel
Save