Browse Source

[python] Refactor get object define communicate to gateway (#7272)

* Change class Base `to_dict` to `get_define` for more clearer
* Remove class TaskParams and sub class make code easy to
  understand and avoid task implement cycle dependence
* Remove class ObjectJsonBase in Task to reduce complexity

fix: #7271
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
2a7d6b468f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 43
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
  2. 6
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  3. 132
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  4. 47
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
  5. 33
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
  6. 18
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
  7. 78
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
  8. 55
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
  9. 14
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  10. 76
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  11. 55
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
  12. 60
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
  13. 43
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
  14. 42
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
  15. 53
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py

43
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py

@ -26,11 +26,14 @@ from pydolphinscheduler.utils.string import attr2camel
class Base: class Base:
"""DolphinScheduler Base object.""" """DolphinScheduler Base object."""
# Object key attribute, to test whether object equals and so on.
_KEY_ATTR: set = {"name", "description"} _KEY_ATTR: set = {"name", "description"}
_TO_DICT_ATTR: set = set() # Object defines attribute, use when needs to communicate with Java gateway server.
_DEFINE_ATTR: set = set()
DEFAULT_ATTR: Dict = {} # Object default attribute, will add those attribute to `_DEFINE_ATTR` when init assign missing.
_DEFAULT_ATTR: Dict = {}
def __init__(self, name: str, description: Optional[str] = None): def __init__(self, name: str, description: Optional[str] = None):
self.name = name self.name = name
@ -44,28 +47,28 @@ class Base:
getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR
) )
# TODO check how Redash do def get_define_custom(
# TODO DRY self, camel_attr: bool = True, custom_attr: set = None
def to_dict(self, camel_attr=True) -> Dict: ) -> Dict:
"""Get object key attribute dict. """Get object definition attribute by given attr set."""
use attribute `self._TO_DICT_ATTR` to determine which attributes should including to
children `to_dict` function.
"""
# content = {}
# for attr, value in self.__dict__.items():
# # Don't publish private variables
# if attr.startswith("_"):
# continue
# else:
# content[snake2camel(attr)] = value
# content.update(self.DEFAULT_ATTR)
# return content
content = {} content = {}
for attr in self._TO_DICT_ATTR: for attr in custom_attr:
val = getattr(self, attr, None) val = getattr(self, attr, None)
if camel_attr: if camel_attr:
content[attr2camel(attr)] = val content[attr2camel(attr)] = val
else: else:
content[attr] = val content[attr] = val
return content return content
def get_define(self, camel_attr: bool = True) -> Dict:
"""Get object definition attribute communicate to Java gateway server.
use attribute `self._DEFINE_ATTR` to determine which attributes should including when
object tries to communicate with Java gateway server.
"""
content = self.get_define_custom(camel_attr, self._DEFINE_ATTR)
update_default = {
k: self._DEFAULT_ATTR.get(k) for k in self._DEFAULT_ATTR if k not in content
}
content.update(update_default)
return content

6
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -68,7 +68,7 @@ class ProcessDefinition(Base):
"param", "param",
} }
_TO_DICT_ATTR = { _DEFINE_ATTR = {
"name", "name",
"description", "description",
"_project", "_project",
@ -195,7 +195,7 @@ class ProcessDefinition(Base):
if not self.tasks: if not self.tasks:
return [self.tasks] return [self.tasks]
else: else:
return [task.to_dict() for task in self.tasks.values()] return [task.get_define() for task in self.tasks.values()]
@property @property
def task_relation_json(self) -> List[Dict]: def task_relation_json(self) -> List[Dict]:
@ -204,7 +204,7 @@ class ProcessDefinition(Base):
return [self.tasks] return [self.tasks]
else: else:
self._handle_root_relation() self._handle_root_relation()
return [tr.to_dict() for tr in self._task_relations] return [tr.get_define() for tr in self._task_relations]
@property @property
def schedule_json(self) -> Optional[Dict]: def schedule_json(self) -> Optional[Dict]:

132
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -15,7 +15,7 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
"""DolphinScheduler ObjectJsonBase, TaskParams and Task object.""" """DolphinScheduler Task and TaskRelation object."""
import logging import logging
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@ -32,61 +32,17 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinitionContext, ProcessDefinitionContext,
) )
from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.utils.string import class_name2camel, snake2camel
class ObjectJsonBase: class TaskRelation(Base):
"""Task base class, define `__str__` and `to_dict` function would be use in other task related class."""
DEFAULT_ATTR = {}
def __int__(self, *args, **kwargs):
pass
def __str__(self) -> str:
content = []
for attribute, value in self.__dict__.items():
content.append(f'"{snake2camel(attribute)}": {value}')
content = ",".join(content)
return f'"{class_name2camel(type(self).__name__)}":{{{content}}}'
# TODO check how Redash do
# TODO DRY
def to_dict(self) -> Dict:
"""Get object key attribute dict which determine by attribute `DEFAULT_ATTR`."""
content = {snake2camel(attr): value for attr, value in self.__dict__.items()}
content.update(self.DEFAULT_ATTR)
return content
class TaskParams(ObjectJsonBase):
"""TaskParams object, describe the key parameter of a single task."""
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
def __init__(
self,
local_params: Optional[List] = None,
resource_list: Optional[List] = None,
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.local_params = local_params or []
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
# TODO need better way to handle it, this code just for POC
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
class TaskRelation(ObjectJsonBase):
"""TaskRelation object, describe the relation of exactly two tasks.""" """TaskRelation object, describe the relation of exactly two tasks."""
DEFAULT_ATTR = { _DEFINE_ATTR = {
"pre_task_code",
"post_task_code",
}
_DEFAULT_ATTR = {
"name": "", "name": "",
"preTaskVersion": 1, "preTaskVersion": 1,
"postTaskVersion": 1, "postTaskVersion": 1,
@ -98,8 +54,9 @@ class TaskRelation(ObjectJsonBase):
self, self,
pre_task_code: int, pre_task_code: int,
post_task_code: int, post_task_code: int,
name: Optional[str] = None,
): ):
super().__init__() super().__init__(name)
self.pre_task_code = pre_task_code self.pre_task_code = pre_task_code
self.post_task_code = post_task_code self.post_task_code = post_task_code
@ -110,19 +67,32 @@ class TaskRelation(ObjectJsonBase):
class Task(Base): class Task(Base):
"""Task object, parent class for all exactly task type.""" """Task object, parent class for all exactly task type."""
DEFAULT_DEPS_ATTR = { _DEFINE_ATTR = {
"name": "", "name",
"preTaskVersion": 1, "code",
"postTaskVersion": 1, "version",
"conditionType": 0, "task_type",
"conditionParams": {}, "task_params",
"description",
"flag",
"task_priority",
"worker_group",
"delay_time",
"fail_retry_times",
"fail_retry_interval",
"timeout_flag",
"timeout_notify_strategy",
"timeout",
} }
_task_custom_attr: set = set()
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
def __init__( def __init__(
self, self,
name: str, name: str,
task_type: str, task_type: str,
task_params: TaskParams,
description: Optional[str] = None, description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES, flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM, task_priority: Optional[str] = TaskPriority.MEDIUM,
@ -134,11 +104,15 @@ class Task(Base):
timeout_notify_strategy: Optional = None, timeout_notify_strategy: Optional = None,
timeout: Optional[int] = 0, timeout: Optional[int] = 0,
process_definition: Optional[ProcessDefinition] = None, process_definition: Optional[ProcessDefinition] = None,
local_params: Optional[List] = None,
resource_list: Optional[List] = None,
dependence: Optional[Dict] = None,
wait_start_timeout: Optional[Dict] = None,
condition_result: Optional[Dict] = None,
): ):
super().__init__(name, description) super().__init__(name, description)
self.task_type = task_type self.task_type = task_type
self.task_params = task_params
self.flag = flag self.flag = flag
self.task_priority = task_priority self.task_priority = task_priority
self.worker_group = worker_group self.worker_group = worker_group
@ -169,6 +143,13 @@ class Task(Base):
self.code, self.code,
) )
# Attribute for task param
self.local_params = local_params or []
self.resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@property @property
def process_definition(self) -> Optional[ProcessDefinition]: def process_definition(self) -> Optional[ProcessDefinition]:
"""Get attribute process_definition.""" """Get attribute process_definition."""
@ -179,6 +160,22 @@ class Task(Base):
"""Set attribute process_definition.""" """Set attribute process_definition."""
self._process_definition = process_definition self._process_definition = process_definition
@property
def task_params(self) -> Optional[Dict]:
"""Get task parameter object.
Will get result to combine _task_custom_attr and custom_attr.
"""
custom_attr = {
"local_params",
"resource_list",
"dependence",
"wait_start_timeout",
"condition_result",
}
custom_attr |= self._task_custom_attr
return self.get_define_custom(custom_attr=custom_attr)
def __hash__(self): def __hash__(self):
return hash(self.code) return hash(self.code)
@ -259,16 +256,3 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result) # gateway_result_checker(result)
return result.get("code"), result.get("version") return result.get("code"), result.get("version")
def to_dict(self, camel_attr=True) -> Dict:
"""Task `to_dict` function which will return key attribute for Task object."""
content = {}
for attr, value in self.__dict__.items():
# Don't publish private variables
if attr.startswith("_"):
continue
elif isinstance(value, TaskParams):
content[snake2camel(attr)] = value.to_dict()
else:
content[snake2camel(attr)] = value
return content

47
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py

@ -20,7 +20,7 @@
from typing import Optional from typing import Optional
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
@ -50,11 +50,22 @@ class HttpCheckCondition:
BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS" BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
class HttpTaskParams(TaskParams): class Http(Task):
"""Parameter only for Http task types.""" """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
_task_custom_attr = {
"url",
"http_method",
"http_params",
"http_check_condition",
"condition",
"connect_timeout",
"socket_timeout",
}
def __init__( def __init__(
self, self,
name: str,
url: str, url: str,
http_method: Optional[str] = HttpMethod.GET, http_method: Optional[str] = HttpMethod.GET,
http_params: Optional[str] = None, http_params: Optional[str] = None,
@ -65,7 +76,7 @@ class HttpTaskParams(TaskParams):
*args, *args,
**kwargs **kwargs
): ):
super().__init__(*args, **kwargs) super().__init__(name, TaskType.HTTP, *args, **kwargs)
self.url = url self.url = url
if not hasattr(HttpMethod, http_method): if not hasattr(HttpMethod, http_method):
raise PyDSParamException( raise PyDSParamException(
@ -88,31 +99,3 @@ class HttpTaskParams(TaskParams):
self.condition = condition self.condition = condition
self.connect_timeout = connect_timeout self.connect_timeout = connect_timeout
self.socket_timeout = socket_timeout self.socket_timeout = socket_timeout
class Http(Task):
"""Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
def __init__(
self,
name: str,
url: str,
http_method: Optional[str] = HttpMethod.GET,
http_params: Optional[str] = None,
http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
condition: Optional[str] = None,
connect_timeout: Optional[int] = 60000,
socket_timeout: Optional[int] = 60000,
*args,
**kwargs
):
task_params = HttpTaskParams(
url=url,
http_method=http_method,
http_params=http_params,
http_check_condition=http_check_condition,
condition=condition,
connect_timeout=connect_timeout,
socket_timeout=socket_timeout,
)
super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs)

33
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py

@ -22,29 +22,30 @@ import types
from typing import Any from typing import Any
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
class PythonTaskParams(TaskParams):
"""Parameter only for Python task types."""
def __init__(self, raw_script: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.raw_script = raw_script
class Python(Task): class Python(Task):
"""Task Python object, declare behavior for Python task to dolphinscheduler.""" """Task Python object, declare behavior for Python task to dolphinscheduler."""
_task_custom_attr = {
"raw_script",
}
def __init__(self, name: str, code: Any, *args, **kwargs): def __init__(self, name: str, code: Any, *args, **kwargs):
if isinstance(code, str): super().__init__(name, TaskType.PYTHON, *args, **kwargs)
task_params = PythonTaskParams(raw_script=code) self._code = code
elif isinstance(code, types.FunctionType):
py_function = inspect.getsource(code) @property
task_params = PythonTaskParams(raw_script=py_function) def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
if isinstance(self._code, str):
return self._code
elif isinstance(self._code, types.FunctionType):
py_function = inspect.getsource(self._code)
return py_function
else: else:
raise PyDSParamException( raise PyDSParamException(
"Parameter code do not support % for now.", type(code) "Parameter code do not support % for now.", type(self._code)
) )
super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs)

18
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py

@ -18,15 +18,7 @@
"""Task shell.""" """Task shell."""
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams from pydolphinscheduler.core.task import Task
class ShellTaskParams(TaskParams):
"""Parameter only for shell task types."""
def __init__(self, raw_script: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.raw_script = raw_script
class Shell(Task): class Shell(Task):
@ -37,6 +29,10 @@ class Shell(Task):
task.name assign to `task_shell` task.name assign to `task_shell`
""" """
_task_custom_attr = {
"raw_script",
}
def __init__(self, name: str, command: str, *args, **kwargs): def __init__(self, name: str, command: str, *args, **kwargs):
task_params = ShellTaskParams(raw_script=command) super().__init__(name, TaskType.SHELL, *args, **kwargs)
super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs) self.raw_script = command

78
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py

@ -21,7 +21,7 @@ import re
from typing import Dict, Optional from typing import Dict, Optional
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams from pydolphinscheduler.core.task import Task
from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.java_gateway import launch_gateway
@ -32,31 +32,6 @@ class SqlType:
NOT_SELECT = 1 NOT_SELECT = 1
class SqlTaskParams(TaskParams):
"""Parameter only for Sql task type."""
def __init__(
self,
type: str,
datasource: str,
sql: str,
sql_type: Optional[int] = SqlType.NOT_SELECT,
display_rows: Optional[int] = 10,
pre_statements: Optional[str] = None,
post_statements: Optional[str] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.type = type
self.datasource = datasource
self.sql = sql
self.sql_type = sql_type
self.display_rows = display_rows
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
class Sql(Task): class Sql(Task):
"""Task SQL object, declare behavior for SQL task to dolphinscheduler. """Task SQL object, declare behavior for SQL task to dolphinscheduler.
@ -73,38 +48,40 @@ class Sql(Task):
database type and database instance would run this sql. database type and database instance would run this sql.
""" """
_task_custom_attr = {
"sql",
"sql_type",
"pre_statements",
"post_statements",
"display_rows",
}
def __init__( def __init__(
self, self,
name: str, name: str,
datasource_name: str, datasource_name: str,
sql: str, sql: str,
pre_sql: Optional[str] = None, pre_statements: Optional[str] = None,
post_sql: Optional[str] = None, post_statements: Optional[str] = None,
display_rows: Optional[int] = 10, display_rows: Optional[int] = 10,
*args, *args,
**kwargs **kwargs
): ):
self._sql = sql super().__init__(name, TaskType.SQL, *args, **kwargs)
self._datasource_name = datasource_name self.datasource_name = datasource_name
self.sql = sql
self.pre_statements = pre_statements or []
self.post_statements = post_statements or []
self.display_rows = display_rows
self._datasource = {} self._datasource = {}
task_params = SqlTaskParams(
type=self.get_datasource_type(),
datasource=self.get_datasource_id(),
sql=sql,
sql_type=self.sql_type,
display_rows=display_rows,
pre_statements=pre_sql,
post_statements=post_sql,
)
super().__init__(name, TaskType.SQL, task_params, *args, **kwargs)
def get_datasource_type(self) -> str: def get_datasource_type(self) -> str:
"""Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`.""" """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
return self.get_datasource_info(self._datasource_name).get("type") return self.get_datasource_info(self.datasource_name).get("type")
def get_datasource_id(self) -> str: def get_datasource_id(self) -> str:
"""Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`.""" """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
return self.get_datasource_info(self._datasource_name).get("id") return self.get_datasource_info(self.datasource_name).get("id")
def get_datasource_info(self, name) -> Dict: def get_datasource_info(self, name) -> Dict:
"""Get datasource info from java gateway, contains datasource id, type, name.""" """Get datasource info from java gateway, contains datasource id, type, name."""
@ -122,7 +99,22 @@ class Sql(Task):
"^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*" "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
) )
pattern_select = re.compile(pattern_select_str, re.IGNORECASE) pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
if pattern_select.match(self._sql) is None: if pattern_select.match(self.sql) is None:
return SqlType.NOT_SELECT return SqlType.NOT_SELECT
else: else:
return SqlType.SELECT return SqlType.SELECT
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for sql task.
Sql task have some specials attribute for task_params, and is odd if we
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
custom_params = {
"type": self.get_datasource_type(),
"datasource": self.get_datasource_id(),
}
params.update(custom_params)
return params

55
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py

@ -20,57 +20,36 @@
from typing import Dict from typing import Dict
from pydolphinscheduler.constants import TaskType from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.process_definition import ProcessDefinitionContext from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.task import Task, TaskParams
from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.java_gateway import launch_gateway
class SubProcessTaskParams(TaskParams):
"""Parameter only for Sub Process task type."""
def __init__(self, process_definition_code, *args, **kwargs):
super().__init__(*args, **kwargs)
self.process_definition_code = process_definition_code
class SubProcess(Task): class SubProcess(Task):
"""Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.""" """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler."""
def __init__(self, name: str, process_definition_name: str, *args, **kwargs): _task_custom_attr = {"process_definition_code"}
self._process_definition_name = process_definition_name
self._process_definition_info = {}
# TODO: Optimize the way of obtaining process_definition
self.process_definition = kwargs.get(
"process_definition", ProcessDefinitionContext.get()
)
if not self.process_definition:
raise PyDSProcessDefinitionNotAssignException(
"ProcessDefinition must be provider when SubProcess initialization."
)
task_params = SubProcessTaskParams( def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
process_definition_code=self.get_process_definition_code(), super().__init__(name, TaskType.SUB_PROCESS, *args, **kwargs)
) self.process_definition_name = process_definition_name
super().__init__(name, TaskType.SUB_PROCESS, task_params, *args, **kwargs)
def get_process_definition_code(self) -> str: @property
def process_definition_code(self) -> str:
"""Get process definition code, a wrapper for :func:`get_process_definition_info`.""" """Get process definition code, a wrapper for :func:`get_process_definition_info`."""
return self.get_process_definition_info(self._process_definition_name).get( return self.get_process_definition_info(self.process_definition_name).get(
"code" "code"
) )
def get_process_definition_info(self, process_definition_name: str) -> Dict: def get_process_definition_info(self, process_definition_name: str) -> Dict:
"""Get process definition info from java gateway, contains process definition id, name, code.""" """Get process definition info from java gateway, contains process definition id, name, code."""
if self._process_definition_info: if not self.process_definition:
return self._process_definition_info raise PyDSProcessDefinitionNotAssignException(
else: "ProcessDefinition must be provider for task SubProcess."
gateway = launch_gateway()
self._process_definition_info = (
gateway.entry_point.getProcessDefinitionInfo(
self.process_definition.user.name,
self.process_definition.project.name,
process_definition_name,
)
) )
return self._process_definition_info gateway = launch_gateway()
return gateway.entry_point.getProcessDefinitionInfo(
self.process_definition.user.name,
self.process_definition.project.name,
process_definition_name,
)

14
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -28,7 +28,6 @@ from pydolphinscheduler.constants import (
ProcessDefinitionReleaseState, ProcessDefinitionReleaseState,
) )
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import TaskParams
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.utils.date import conv_to_schedule from pydolphinscheduler.utils.date import conv_to_schedule
@ -152,8 +151,8 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val) pd._parse_datetime(val)
def test_process_definition_to_dict_without_task(): def test_process_definition_get_define_without_task():
"""Test process definition function to_dict without task.""" """Test process definition function get_define without task."""
expect = { expect = {
"name": TEST_PROCESS_DEFINITION_NAME, "name": TEST_PROCESS_DEFINITION_NAME,
"description": None, "description": None,
@ -168,7 +167,7 @@ def test_process_definition_to_dict_without_task():
"taskRelationJson": [{}], "taskRelationJson": [{}],
} }
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert pd.to_dict() == expect assert pd.get_define() == expect
def test_process_definition_simple_context_manager(): def test_process_definition_simple_context_manager():
@ -176,10 +175,7 @@ def test_process_definition_simple_context_manager():
expect_tasks_num = 5 expect_tasks_num = 5
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
for i in range(expect_tasks_num): for i in range(expect_tasks_num):
task_params = TaskParams() curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
curr_task = Task(
name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
)
# Set deps task i as i-1 parent # Set deps task i as i-1 parent
if i > 0: if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}") pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
@ -221,11 +217,9 @@ def test_process_definition_simple_separate():
expect_tasks_num = 5 expect_tasks_num = 5
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
for i in range(expect_tasks_num): for i in range(expect_tasks_num):
task_params = TaskParams()
curr_task = Task( curr_task = Task(
name=f"task-{i}", name=f"task-{i}",
task_type=f"type-{i}", task_type=f"type-{i}",
task_params=task_params,
process_definition=pd, process_definition=pd,
) )
# Set deps task i as i-1 parent # Set deps task i as i-1 parent

76
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -21,21 +21,49 @@ from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.core.task import Task, TaskParams, TaskRelation from pydolphinscheduler.core.task import Task, TaskRelation
from tests.testing.task import Task as testTask from tests.testing.task import Task as testTask
def test_task_params_to_dict(): @pytest.mark.parametrize(
"""Test TaskParams object function to_dict.""" "attr, expect",
expect = { [
"resourceList": [], (
"localParams": [], dict(),
"dependence": {}, {
"conditionResult": TaskParams.DEFAULT_CONDITION_RESULT, "localParams": [],
"waitStartTimeout": {}, "resourceList": [],
} "dependence": {},
task_param = TaskParams() "waitStartTimeout": {},
assert task_param.to_dict() == expect "conditionResult": {"successNode": [""], "failedNode": [""]},
},
),
(
{
"local_params": ["foo", "bar"],
"resource_list": ["foo", "bar"],
"dependence": {"foo", "bar"},
"wait_start_timeout": {"foo", "bar"},
"condition_result": {"foo": ["bar"]},
},
{
"localParams": ["foo", "bar"],
"resourceList": ["foo", "bar"],
"dependence": {"foo", "bar"},
"waitStartTimeout": {"foo", "bar"},
"conditionResult": {"foo": ["bar"]},
},
),
],
)
def test_property_task_params(attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
"test-task",
**attr,
)
assert expect == task.task_params
def test_task_relation_to_dict(): def test_task_relation_to_dict():
@ -54,15 +82,15 @@ def test_task_relation_to_dict():
task_param = TaskRelation( task_param = TaskRelation(
pre_task_code=pre_task_code, post_task_code=post_task_code pre_task_code=pre_task_code, post_task_code=post_task_code
) )
assert task_param.to_dict() == expect assert task_param.get_define() == expect
def test_task_to_dict(): def test_task_get_define():
"""Test Task object function to_dict.""" """Test Task object function get_define."""
code = 123 code = 123
version = 1 version = 1
name = "test_task_to_dict" name = "test_task_get_define"
task_type = "test_task_to_dict_type" task_type = "test_task_get_define_type"
expect = { expect = {
"code": code, "code": code,
"name": name, "name": name,
@ -90,8 +118,8 @@ def test_task_to_dict():
"pydolphinscheduler.core.task.Task.gen_code_and_version", "pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version), return_value=(code, version),
): ):
task = Task(name=name, task_type=task_type, task_params=TaskParams()) task = Task(name=name, task_type=task_type)
assert task.to_dict() == expect assert task.get_define() == expect
@pytest.mark.parametrize("shift", ["<<", ">>"]) @pytest.mark.parametrize("shift", ["<<", ">>"])
@ -100,8 +128,8 @@ def test_two_tasks_shift(shift: str):
Here we test both `>>` and `<<` bit operator. Here we test both `>>` and `<<` bit operator.
""" """
upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams()) upstream = testTask(name="upstream", task_type=shift)
downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams()) downstream = testTask(name="downstream", task_type=shift)
if shift == "<<": if shift == "<<":
downstream << upstream downstream << upstream
elif shift == ">>": elif shift == ">>":
@ -137,10 +165,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
"downstream": "upstream", "downstream": "upstream",
} }
task_type = "dep_task_and_tasks" task_type = "dep_task_and_tasks"
task = testTask(name="upstream", task_type=task_type, task_params=TaskParams()) task = testTask(name="upstream", task_type=task_type)
tasks = [ tasks = [
testTask(name="downstream1", task_type=task_type, task_params=TaskParams()), testTask(name="downstream1", task_type=task_type),
testTask(name="downstream2", task_type=task_type, task_params=TaskParams()), testTask(name="downstream2", task_type=task_type),
] ]
# Use build-in function eval to simply test case and reduce duplicate code # Use build-in function eval to simply test case and reduce duplicate code

55
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py

@ -22,12 +22,7 @@ from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.http import ( from pydolphinscheduler.tasks.http import Http, HttpCheckCondition, HttpMethod
Http,
HttpCheckCondition,
HttpMethod,
HttpTaskParams,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -50,6 +45,38 @@ def test_attr_exists(class_name, attrs):
assert all(hasattr(class_name, attr) for attr in attrs) assert all(hasattr(class_name, attr) for attr in attrs)
@pytest.mark.parametrize(
"attr, expect",
[
(
{"url": "https://www.apache.org"},
{
"url": "https://www.apache.org",
"httpMethod": "GET",
"httpParams": [],
"httpCheckCondition": "STATUS_CODE_DEFAULT",
"condition": None,
"connectTimeout": 60000,
"socketTimeout": 60000,
"localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_property_task_params(mock_code_version, attr, expect):
"""Test task http property."""
task = Http("test-http-task-params", **attr)
assert expect == task.task_params
@pytest.mark.parametrize( @pytest.mark.parametrize(
"param", "param",
[ [
@ -62,18 +89,22 @@ def test_attr_exists(class_name, attrs):
}, },
], ],
) )
def test_http_task_param_not_support_param(param): @patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_http_task_param_not_support_param(mock_code, param):
"""Test HttpTaskParams not support parameter.""" """Test HttpTaskParams not support parameter."""
url = "https://www.apache.org" url = "https://www.apache.org"
with pytest.raises(PyDSParamException, match="Parameter .*?"): with pytest.raises(PyDSParamException, match="Parameter .*?"):
HttpTaskParams(url, **param) Http("test-no-supprot-param", url, **param)
def test_http_to_dict(): def test_http_get_define():
"""Test task HTTP function to_dict.""" """Test task HTTP function get_define."""
code = 123 code = 123
version = 1 version = 1
name = "test_http_to_dict" name = "test_http_get_define"
url = "https://www.apache.org" url = "https://www.apache.org"
expect = { expect = {
"code": code, "code": code,
@ -110,4 +141,4 @@ def test_http_to_dict():
return_value=(code, version), return_value=(code, version),
): ):
http = Http(name, url) http = Http(name, url)
assert http.to_dict() == expect assert http.get_define() == expect

60
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@ -23,26 +23,33 @@ from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python, PythonTaskParams from pydolphinscheduler.tasks.python import Python
@pytest.mark.parametrize( @pytest.mark.parametrize(
"name, value", "attr, expect",
[ [
("local_params", "local_params"), (
("resource_list", "resource_list"), {"code": "print(1)"},
("dependence", "dependence"), {
("wait_start_timeout", "wait_start_timeout"), "rawScript": "print(1)",
("condition_result", "condition_result"), "localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
], ],
) )
def test_python_task_params_attr_setter(name, value): @patch(
"""Test python task parameters.""" "pydolphinscheduler.core.task.Task.gen_code_and_version",
command = 'print("hello world.")' return_value=(123, 1),
python_task_params = PythonTaskParams(command) )
assert command == python_task_params.raw_script def test_property_task_params(mock_code_version, attr, expect):
setattr(python_task_params, name, value) """Test task python property."""
assert value == getattr(python_task_params, name) task = Python("test-python-task-params", **attr)
assert expect == task.task_params
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -52,19 +59,16 @@ def test_python_task_params_attr_setter(name, value):
("print", "hello world"), ("print", "hello world"),
], ],
) )
def test_python_task_not_support_code(script_code): @patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_python_task_not_support_code(mock_code, script_code):
"""Test python task parameters.""" """Test python task parameters."""
name = "not_support_code_type" name = "not_support_code_type"
code = 123 with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
version = 1 task = Python(name, script_code)
with patch( task.raw_script
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
with pytest.raises(
PyDSParamException, match="Parameter code do not support .*?"
):
Python(name, script_code)
def foo(): # noqa: D103 def foo(): # noqa: D103
@ -82,8 +86,8 @@ def foo(): # noqa: D103
), ),
], ],
) )
def test_python_to_dict(name, script_code, raw): def test_python_get_define(name, script_code, raw):
"""Test task python function to_dict.""" """Test task python function get_define."""
code = 123 code = 123
version = 1 version = 1
expect = { expect = {
@ -115,4 +119,4 @@ def test_python_to_dict(name, script_code, raw):
return_value=(code, version), return_value=(code, version),
): ):
shell = Python(name, script_code) shell = Python(name, script_code)
assert shell.to_dict() == expect assert shell.get_define() == expect

43
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

@ -22,33 +22,40 @@ from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams from pydolphinscheduler.tasks.shell import Shell
@pytest.mark.parametrize( @pytest.mark.parametrize(
"name, value", "attr, expect",
[ [
("local_params", "local_params"), (
("resource_list", "resource_list"), {"command": "test script"},
("dependence", "dependence"), {
("wait_start_timeout", "wait_start_timeout"), "rawScript": "test script",
("condition_result", "condition_result"), "localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
], ],
) )
def test_shell_task_params_attr_setter(name, value): @patch(
"""Test shell task parameters.""" "pydolphinscheduler.core.task.Task.gen_code_and_version",
raw_script = "echo shell task parameter" return_value=(123, 1),
shell_task_params = ShellTaskParams(raw_script) )
assert raw_script == shell_task_params.raw_script def test_property_task_params(mock_code_version, attr, expect):
setattr(shell_task_params, name, value) """Test task shell task property."""
assert value == getattr(shell_task_params, name) task = Shell("test-shell-task-params", **attr)
assert expect == task.task_params
def test_shell_to_dict(): def test_shell_get_define():
"""Test task shell function to_dict.""" """Test task shell function get_define."""
code = 123 code = 123
version = 1 version = 1
name = "test_shell_to_dict" name = "test_shell_get_define"
command = "echo test shell" command = "echo test shell"
expect = { expect = {
"code": code, "code": code,
@ -79,4 +86,4 @@ def test_shell_to_dict():
return_value=(code, version), return_value=(code, version),
): ):
shell = Shell(name, command) shell = Shell(name, command)
assert shell.to_dict() == expect assert shell.get_define() == expect

42
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@ -82,12 +82,48 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}" ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
@pytest.mark.parametrize(
"attr, expect",
[
(
{"datasource_name": "datasource_name", "sql": "select 1"},
{
"sql": "select 1",
"type": "MYSQL",
"datasource": 1,
"sqlType": SqlType.SELECT,
"preStatements": [],
"postStatements": [],
"displayRows": 10,
"localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
return_value=({"id": 1, "type": "MYSQL"}),
)
def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
"""Test task sql task property."""
task = Sql("test-sql-task-params", **attr)
assert expect == task.task_params
@patch( @patch(
"pydolphinscheduler.tasks.sql.Sql.get_datasource_info", "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
return_value=({"id": 1, "type": "MYSQL"}), return_value=({"id": 1, "type": "MYSQL"}),
) )
def test_sql_to_dict(mock_datasource): def test_sql_get_define(mock_datasource):
"""Test task sql function to_dict.""" """Test task sql function get_define."""
code = 123 code = 123
version = 1 version = 1
name = "test_sql_dict" name = "test_sql_dict"
@ -128,4 +164,4 @@ def test_sql_to_dict(mock_datasource):
return_value=(code, version), return_value=(code, version),
): ):
task = Sql(name, datasource_name, command) task = Sql(name, datasource_name, command)
assert task.to_dict() == expect assert task.get_define() == expect

53
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py

@ -23,7 +23,7 @@ from unittest.mock import patch
import pytest import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams from pydolphinscheduler.tasks.sub_process import SubProcess
TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition" TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition"
TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320" TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320"
@ -31,22 +31,39 @@ TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
@pytest.mark.parametrize( @pytest.mark.parametrize(
"name, value", "attr, expect",
[ [
("local_params", "local_params"), (
("resource_list", "resource_list"), {"process_definition_name": TEST_SUB_PROCESS_DEFINITION_NAME},
("dependence", "dependence"), {
("wait_start_timeout", "wait_start_timeout"), "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
("condition_result", "condition_result"), "localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
], ],
) )
def test_sub_process_task_params_attr_setter(name, value): @patch(
"""Test sub_process task parameters.""" "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
process_definition_code = "3643589832320" return_value=(
sub_process_task_params = SubProcessTaskParams(process_definition_code) {
assert process_definition_code == sub_process_task_params.process_definition_code "id": 1,
setattr(sub_process_task_params, name, value) "name": TEST_SUB_PROCESS_DEFINITION_NAME,
assert value == getattr(sub_process_task_params, name) "code": TEST_SUB_PROCESS_DEFINITION_CODE,
}
),
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_property_task_params(mock_code_version, mock_pd_info, attr, expect):
"""Test task sub process property."""
task = SubProcess("test-sub-process-task-params", **attr)
assert expect == task.task_params
@patch( @patch(
@ -59,11 +76,11 @@ def test_sub_process_task_params_attr_setter(name, value):
} }
), ),
) )
def test_sub_process_to_dict(mock_process_definition): def test_sub_process_get_define(mock_process_definition):
"""Test task sub_process function to_dict.""" """Test task sub_process function get_define."""
code = 123 code = 123
version = 1 version = 1
name = "test_sub_process_to_dict" name = "test_sub_process_get_define"
expect = { expect = {
"code": code, "code": code,
"name": name, "name": name,
@ -94,4 +111,4 @@ def test_sub_process_to_dict(mock_process_definition):
): ):
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME): with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME):
sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME) sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME)
assert sub_process.to_dict() == expect assert sub_process.get_define() == expect

Loading…
Cancel
Save