diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py index d636d512e1..690351ab2f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py @@ -26,11 +26,14 @@ from pydolphinscheduler.utils.string import attr2camel class Base: """DolphinScheduler Base object.""" + # Object key attribute, to test whether object equals and so on. _KEY_ATTR: set = {"name", "description"} - _TO_DICT_ATTR: set = set() + # Object defines attribute, use when needs to communicate with Java gateway server. + _DEFINE_ATTR: set = set() - DEFAULT_ATTR: Dict = {} + # Object default attribute, will add those attribute to `_DEFINE_ATTR` when init assign missing. + _DEFAULT_ATTR: Dict = {} def __init__(self, name: str, description: Optional[str] = None): self.name = name @@ -44,28 +47,28 @@ class Base: getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR ) - # TODO check how Redash do - # TODO DRY - def to_dict(self, camel_attr=True) -> Dict: - """Get object key attribute dict. - - use attribute `self._TO_DICT_ATTR` to determine which attributes should including to - children `to_dict` function. - """ - # content = {} - # for attr, value in self.__dict__.items(): - # # Don't publish private variables - # if attr.startswith("_"): - # continue - # else: - # content[snake2camel(attr)] = value - # content.update(self.DEFAULT_ATTR) - # return content + def get_define_custom( + self, camel_attr: bool = True, custom_attr: set = None + ) -> Dict: + """Get object definition attribute by given attr set.""" content = {} - for attr in self._TO_DICT_ATTR: + for attr in custom_attr: val = getattr(self, attr, None) if camel_attr: content[attr2camel(attr)] = val else: content[attr] = val return content + + def get_define(self, camel_attr: bool = True) -> Dict: + """Get object definition attribute communicate to Java gateway server. + + use attribute `self._DEFINE_ATTR` to determine which attributes should including when + object tries to communicate with Java gateway server. + """ + content = self.get_define_custom(camel_attr, self._DEFINE_ATTR) + update_default = { + k: self._DEFAULT_ATTR.get(k) for k in self._DEFAULT_ATTR if k not in content + } + content.update(update_default) + return content diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 1586757698..70d4e6b8e0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -68,7 +68,7 @@ class ProcessDefinition(Base): "param", } - _TO_DICT_ATTR = { + _DEFINE_ATTR = { "name", "description", "_project", @@ -195,7 +195,7 @@ class ProcessDefinition(Base): if not self.tasks: return [self.tasks] else: - return [task.to_dict() for task in self.tasks.values()] + return [task.get_define() for task in self.tasks.values()] @property def task_relation_json(self) -> List[Dict]: @@ -204,7 +204,7 @@ class ProcessDefinition(Base): return [self.tasks] else: self._handle_root_relation() - return [tr.to_dict() for tr in self._task_relations] + return [tr.get_define() for tr in self._task_relations] @property def schedule_json(self) -> Optional[Dict]: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index d22e5c8a41..39cca9c0b9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -"""DolphinScheduler ObjectJsonBase, TaskParams and Task object.""" +"""DolphinScheduler Task and TaskRelation object.""" import logging from typing import Dict, List, Optional, Sequence, Set, Tuple, Union @@ -32,61 +32,17 @@ from pydolphinscheduler.core.process_definition import ( ProcessDefinitionContext, ) from pydolphinscheduler.java_gateway import launch_gateway -from pydolphinscheduler.utils.string import class_name2camel, snake2camel -class ObjectJsonBase: - """Task base class, define `__str__` and `to_dict` function would be use in other task related class.""" - - DEFAULT_ATTR = {} - - def __int__(self, *args, **kwargs): - pass - - def __str__(self) -> str: - content = [] - for attribute, value in self.__dict__.items(): - content.append(f'"{snake2camel(attribute)}": {value}') - content = ",".join(content) - return f'"{class_name2camel(type(self).__name__)}":{{{content}}}' - - # TODO check how Redash do - # TODO DRY - def to_dict(self) -> Dict: - """Get object key attribute dict which determine by attribute `DEFAULT_ATTR`.""" - content = {snake2camel(attr): value for attr, value in self.__dict__.items()} - content.update(self.DEFAULT_ATTR) - return content - - -class TaskParams(ObjectJsonBase): - """TaskParams object, describe the key parameter of a single task.""" - - DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} - - def __init__( - self, - local_params: Optional[List] = None, - resource_list: Optional[List] = None, - dependence: Optional[Dict] = None, - wait_start_timeout: Optional[Dict] = None, - condition_result: Optional[Dict] = None, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self.local_params = local_params or [] - self.resource_list = resource_list or [] - self.dependence = dependence or {} - self.wait_start_timeout = wait_start_timeout or {} - # TODO need better way to handle it, this code just for POC - self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT - - -class TaskRelation(ObjectJsonBase): +class TaskRelation(Base): """TaskRelation object, describe the relation of exactly two tasks.""" - DEFAULT_ATTR = { + _DEFINE_ATTR = { + "pre_task_code", + "post_task_code", + } + + _DEFAULT_ATTR = { "name": "", "preTaskVersion": 1, "postTaskVersion": 1, @@ -98,8 +54,9 @@ class TaskRelation(ObjectJsonBase): self, pre_task_code: int, post_task_code: int, + name: Optional[str] = None, ): - super().__init__() + super().__init__(name) self.pre_task_code = pre_task_code self.post_task_code = post_task_code @@ -110,19 +67,32 @@ class TaskRelation(ObjectJsonBase): class Task(Base): """Task object, parent class for all exactly task type.""" - DEFAULT_DEPS_ATTR = { - "name": "", - "preTaskVersion": 1, - "postTaskVersion": 1, - "conditionType": 0, - "conditionParams": {}, + _DEFINE_ATTR = { + "name", + "code", + "version", + "task_type", + "task_params", + "description", + "flag", + "task_priority", + "worker_group", + "delay_time", + "fail_retry_times", + "fail_retry_interval", + "timeout_flag", + "timeout_notify_strategy", + "timeout", } + _task_custom_attr: set = set() + + DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} + def __init__( self, name: str, task_type: str, - task_params: TaskParams, description: Optional[str] = None, flag: Optional[str] = TaskFlag.YES, task_priority: Optional[str] = TaskPriority.MEDIUM, @@ -134,11 +104,15 @@ class Task(Base): timeout_notify_strategy: Optional = None, timeout: Optional[int] = 0, process_definition: Optional[ProcessDefinition] = None, + local_params: Optional[List] = None, + resource_list: Optional[List] = None, + dependence: Optional[Dict] = None, + wait_start_timeout: Optional[Dict] = None, + condition_result: Optional[Dict] = None, ): super().__init__(name, description) self.task_type = task_type - self.task_params = task_params self.flag = flag self.task_priority = task_priority self.worker_group = worker_group @@ -169,6 +143,13 @@ class Task(Base): self.code, ) + # Attribute for task param + self.local_params = local_params or [] + self.resource_list = resource_list or [] + self.dependence = dependence or {} + self.wait_start_timeout = wait_start_timeout or {} + self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT + @property def process_definition(self) -> Optional[ProcessDefinition]: """Get attribute process_definition.""" @@ -179,6 +160,22 @@ class Task(Base): """Set attribute process_definition.""" self._process_definition = process_definition + @property + def task_params(self) -> Optional[Dict]: + """Get task parameter object. + + Will get result to combine _task_custom_attr and custom_attr. + """ + custom_attr = { + "local_params", + "resource_list", + "dependence", + "wait_start_timeout", + "condition_result", + } + custom_attr |= self._task_custom_attr + return self.get_define_custom(custom_attr=custom_attr) + def __hash__(self): return hash(self.code) @@ -259,16 +256,3 @@ class Task(Base): # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") - - def to_dict(self, camel_attr=True) -> Dict: - """Task `to_dict` function which will return key attribute for Task object.""" - content = {} - for attr, value in self.__dict__.items(): - # Don't publish private variables - if attr.startswith("_"): - continue - elif isinstance(value, TaskParams): - content[snake2camel(attr)] = value.to_dict() - else: - content[snake2camel(attr)] = value - return content diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py index 445142e5b9..781333d481 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py @@ -20,7 +20,7 @@ from typing import Optional from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.core.task import Task from pydolphinscheduler.exceptions import PyDSParamException @@ -50,11 +50,22 @@ class HttpCheckCondition: BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS" -class HttpTaskParams(TaskParams): - """Parameter only for Http task types.""" +class Http(Task): + """Task HTTP object, declare behavior for HTTP task to dolphinscheduler.""" + + _task_custom_attr = { + "url", + "http_method", + "http_params", + "http_check_condition", + "condition", + "connect_timeout", + "socket_timeout", + } def __init__( self, + name: str, url: str, http_method: Optional[str] = HttpMethod.GET, http_params: Optional[str] = None, @@ -65,7 +76,7 @@ class HttpTaskParams(TaskParams): *args, **kwargs ): - super().__init__(*args, **kwargs) + super().__init__(name, TaskType.HTTP, *args, **kwargs) self.url = url if not hasattr(HttpMethod, http_method): raise PyDSParamException( @@ -88,31 +99,3 @@ class HttpTaskParams(TaskParams): self.condition = condition self.connect_timeout = connect_timeout self.socket_timeout = socket_timeout - - -class Http(Task): - """Task HTTP object, declare behavior for HTTP task to dolphinscheduler.""" - - def __init__( - self, - name: str, - url: str, - http_method: Optional[str] = HttpMethod.GET, - http_params: Optional[str] = None, - http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT, - condition: Optional[str] = None, - connect_timeout: Optional[int] = 60000, - socket_timeout: Optional[int] = 60000, - *args, - **kwargs - ): - task_params = HttpTaskParams( - url=url, - http_method=http_method, - http_params=http_params, - http_check_condition=http_check_condition, - condition=condition, - connect_timeout=connect_timeout, - socket_timeout=socket_timeout, - ) - super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py index 9a7149a520..79504808c8 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py @@ -22,29 +22,30 @@ import types from typing import Any from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.core.task import Task from pydolphinscheduler.exceptions import PyDSParamException -class PythonTaskParams(TaskParams): - """Parameter only for Python task types.""" - - def __init__(self, raw_script: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.raw_script = raw_script - - class Python(Task): """Task Python object, declare behavior for Python task to dolphinscheduler.""" + _task_custom_attr = { + "raw_script", + } + def __init__(self, name: str, code: Any, *args, **kwargs): - if isinstance(code, str): - task_params = PythonTaskParams(raw_script=code) - elif isinstance(code, types.FunctionType): - py_function = inspect.getsource(code) - task_params = PythonTaskParams(raw_script=py_function) + super().__init__(name, TaskType.PYTHON, *args, **kwargs) + self._code = code + + @property + def raw_script(self) -> str: + """Get python task define attribute `raw_script`.""" + if isinstance(self._code, str): + return self._code + elif isinstance(self._code, types.FunctionType): + py_function = inspect.getsource(self._code) + return py_function else: raise PyDSParamException( - "Parameter code do not support % for now.", type(code) + "Parameter code do not support % for now.", type(self._code) ) - super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py index 25e82f54f1..94ad2f4416 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py @@ -18,15 +18,7 @@ """Task shell.""" from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task, TaskParams - - -class ShellTaskParams(TaskParams): - """Parameter only for shell task types.""" - - def __init__(self, raw_script: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.raw_script = raw_script +from pydolphinscheduler.core.task import Task class Shell(Task): @@ -37,6 +29,10 @@ class Shell(Task): task.name assign to `task_shell` """ + _task_custom_attr = { + "raw_script", + } + def __init__(self, name: str, command: str, *args, **kwargs): - task_params = ShellTaskParams(raw_script=command) - super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs) + super().__init__(name, TaskType.SHELL, *args, **kwargs) + self.raw_script = command diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py index 62da964d58..f16eb10509 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py @@ -21,7 +21,7 @@ import re from typing import Dict, Optional from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.core.task import Task from pydolphinscheduler.java_gateway import launch_gateway @@ -32,31 +32,6 @@ class SqlType: NOT_SELECT = 1 -class SqlTaskParams(TaskParams): - """Parameter only for Sql task type.""" - - def __init__( - self, - type: str, - datasource: str, - sql: str, - sql_type: Optional[int] = SqlType.NOT_SELECT, - display_rows: Optional[int] = 10, - pre_statements: Optional[str] = None, - post_statements: Optional[str] = None, - *args, - **kwargs - ): - super().__init__(*args, **kwargs) - self.type = type - self.datasource = datasource - self.sql = sql - self.sql_type = sql_type - self.display_rows = display_rows - self.pre_statements = pre_statements or [] - self.post_statements = post_statements or [] - - class Sql(Task): """Task SQL object, declare behavior for SQL task to dolphinscheduler. @@ -73,38 +48,40 @@ class Sql(Task): database type and database instance would run this sql. """ + _task_custom_attr = { + "sql", + "sql_type", + "pre_statements", + "post_statements", + "display_rows", + } + def __init__( self, name: str, datasource_name: str, sql: str, - pre_sql: Optional[str] = None, - post_sql: Optional[str] = None, + pre_statements: Optional[str] = None, + post_statements: Optional[str] = None, display_rows: Optional[int] = 10, *args, **kwargs ): - self._sql = sql - self._datasource_name = datasource_name + super().__init__(name, TaskType.SQL, *args, **kwargs) + self.datasource_name = datasource_name + self.sql = sql + self.pre_statements = pre_statements or [] + self.post_statements = post_statements or [] + self.display_rows = display_rows self._datasource = {} - task_params = SqlTaskParams( - type=self.get_datasource_type(), - datasource=self.get_datasource_id(), - sql=sql, - sql_type=self.sql_type, - display_rows=display_rows, - pre_statements=pre_sql, - post_statements=post_sql, - ) - super().__init__(name, TaskType.SQL, task_params, *args, **kwargs) def get_datasource_type(self) -> str: """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self._datasource_name).get("type") + return self.get_datasource_info(self.datasource_name).get("type") def get_datasource_id(self) -> str: """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self._datasource_name).get("id") + return self.get_datasource_info(self.datasource_name).get("id") def get_datasource_info(self, name) -> Dict: """Get datasource info from java gateway, contains datasource id, type, name.""" @@ -122,7 +99,22 @@ class Sql(Task): "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*" ) pattern_select = re.compile(pattern_select_str, re.IGNORECASE) - if pattern_select.match(self._sql) is None: + if pattern_select.match(self.sql) is None: return SqlType.NOT_SELECT else: return SqlType.SELECT + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for sql task. + + Sql task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + custom_params = { + "type": self.get_datasource_type(), + "datasource": self.get_datasource_id(), + } + params.update(custom_params) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py index 1bf0bd1136..8ba6b4c64d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py @@ -20,57 +20,36 @@ from typing import Dict from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.process_definition import ProcessDefinitionContext -from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.core.task import Task from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException from pydolphinscheduler.java_gateway import launch_gateway -class SubProcessTaskParams(TaskParams): - """Parameter only for Sub Process task type.""" - - def __init__(self, process_definition_code, *args, **kwargs): - super().__init__(*args, **kwargs) - self.process_definition_code = process_definition_code - - class SubProcess(Task): """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.""" - def __init__(self, name: str, process_definition_name: str, *args, **kwargs): - self._process_definition_name = process_definition_name - self._process_definition_info = {} - # TODO: Optimize the way of obtaining process_definition - self.process_definition = kwargs.get( - "process_definition", ProcessDefinitionContext.get() - ) - if not self.process_definition: - raise PyDSProcessDefinitionNotAssignException( - "ProcessDefinition must be provider when SubProcess initialization." - ) + _task_custom_attr = {"process_definition_code"} - task_params = SubProcessTaskParams( - process_definition_code=self.get_process_definition_code(), - ) - super().__init__(name, TaskType.SUB_PROCESS, task_params, *args, **kwargs) + def __init__(self, name: str, process_definition_name: str, *args, **kwargs): + super().__init__(name, TaskType.SUB_PROCESS, *args, **kwargs) + self.process_definition_name = process_definition_name - def get_process_definition_code(self) -> str: + @property + def process_definition_code(self) -> str: """Get process definition code, a wrapper for :func:`get_process_definition_info`.""" - return self.get_process_definition_info(self._process_definition_name).get( + return self.get_process_definition_info(self.process_definition_name).get( "code" ) def get_process_definition_info(self, process_definition_name: str) -> Dict: """Get process definition info from java gateway, contains process definition id, name, code.""" - if self._process_definition_info: - return self._process_definition_info - else: - gateway = launch_gateway() - self._process_definition_info = ( - gateway.entry_point.getProcessDefinitionInfo( - self.process_definition.user.name, - self.process_definition.project.name, - process_definition_name, - ) + if not self.process_definition: + raise PyDSProcessDefinitionNotAssignException( + "ProcessDefinition must be provider for task SubProcess." ) - return self._process_definition_info + gateway = launch_gateway() + return gateway.entry_point.getProcessDefinitionInfo( + self.process_definition.user.name, + self.process_definition.project.name, + process_definition_name, + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 4c1597425d..8491878ea5 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -28,7 +28,6 @@ from pydolphinscheduler.constants import ( ProcessDefinitionReleaseState, ) from pydolphinscheduler.core.process_definition import ProcessDefinition -from pydolphinscheduler.core.task import TaskParams from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.utils.date import conv_to_schedule @@ -152,8 +151,8 @@ def test__parse_datetime_not_support_type(val: Any): pd._parse_datetime(val) -def test_process_definition_to_dict_without_task(): - """Test process definition function to_dict without task.""" +def test_process_definition_get_define_without_task(): + """Test process definition function get_define without task.""" expect = { "name": TEST_PROCESS_DEFINITION_NAME, "description": None, @@ -168,7 +167,7 @@ def test_process_definition_to_dict_without_task(): "taskRelationJson": [{}], } with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: - assert pd.to_dict() == expect + assert pd.get_define() == expect def test_process_definition_simple_context_manager(): @@ -176,10 +175,7 @@ def test_process_definition_simple_context_manager(): expect_tasks_num = 5 with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: for i in range(expect_tasks_num): - task_params = TaskParams() - curr_task = Task( - name=f"task-{i}", task_type=f"type-{i}", task_params=task_params - ) + curr_task = Task(name=f"task-{i}", task_type=f"type-{i}") # Set deps task i as i-1 parent if i > 0: pre_task = pd.get_one_task_by_name(f"task-{i - 1}") @@ -221,11 +217,9 @@ def test_process_definition_simple_separate(): expect_tasks_num = 5 pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) for i in range(expect_tasks_num): - task_params = TaskParams() curr_task = Task( name=f"task-{i}", task_type=f"type-{i}", - task_params=task_params, process_definition=pd, ) # Set deps task i as i-1 parent diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 6d09820543..b551f072b1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -21,21 +21,49 @@ from unittest.mock import patch import pytest -from pydolphinscheduler.core.task import Task, TaskParams, TaskRelation +from pydolphinscheduler.core.task import Task, TaskRelation from tests.testing.task import Task as testTask -def test_task_params_to_dict(): - """Test TaskParams object function to_dict.""" - expect = { - "resourceList": [], - "localParams": [], - "dependence": {}, - "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT, - "waitStartTimeout": {}, - } - task_param = TaskParams() - assert task_param.to_dict() == expect +@pytest.mark.parametrize( + "attr, expect", + [ + ( + dict(), + { + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ), + ( + { + "local_params": ["foo", "bar"], + "resource_list": ["foo", "bar"], + "dependence": {"foo", "bar"}, + "wait_start_timeout": {"foo", "bar"}, + "condition_result": {"foo": ["bar"]}, + }, + { + "localParams": ["foo", "bar"], + "resourceList": ["foo", "bar"], + "dependence": {"foo", "bar"}, + "waitStartTimeout": {"foo", "bar"}, + "conditionResult": {"foo": ["bar"]}, + }, + ), + ], +) +def test_property_task_params(attr, expect): + """Test class task property.""" + task = testTask( + "test-property-task-params", + "test-task", + **attr, + ) + assert expect == task.task_params def test_task_relation_to_dict(): @@ -54,15 +82,15 @@ def test_task_relation_to_dict(): task_param = TaskRelation( pre_task_code=pre_task_code, post_task_code=post_task_code ) - assert task_param.to_dict() == expect + assert task_param.get_define() == expect -def test_task_to_dict(): - """Test Task object function to_dict.""" +def test_task_get_define(): + """Test Task object function get_define.""" code = 123 version = 1 - name = "test_task_to_dict" - task_type = "test_task_to_dict_type" + name = "test_task_get_define" + task_type = "test_task_get_define_type" expect = { "code": code, "name": name, @@ -90,8 +118,8 @@ def test_task_to_dict(): "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(code, version), ): - task = Task(name=name, task_type=task_type, task_params=TaskParams()) - assert task.to_dict() == expect + task = Task(name=name, task_type=task_type) + assert task.get_define() == expect @pytest.mark.parametrize("shift", ["<<", ">>"]) @@ -100,8 +128,8 @@ def test_two_tasks_shift(shift: str): Here we test both `>>` and `<<` bit operator. """ - upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams()) - downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams()) + upstream = testTask(name="upstream", task_type=shift) + downstream = testTask(name="downstream", task_type=shift) if shift == "<<": downstream << upstream elif shift == ">>": @@ -137,10 +165,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str): "downstream": "upstream", } task_type = "dep_task_and_tasks" - task = testTask(name="upstream", task_type=task_type, task_params=TaskParams()) + task = testTask(name="upstream", task_type=task_type) tasks = [ - testTask(name="downstream1", task_type=task_type, task_params=TaskParams()), - testTask(name="downstream2", task_type=task_type, task_params=TaskParams()), + testTask(name="downstream1", task_type=task_type), + testTask(name="downstream2", task_type=task_type), ] # Use build-in function eval to simply test case and reduce duplicate code diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py index 7c01517c94..060cdec0b0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py @@ -22,12 +22,7 @@ from unittest.mock import patch import pytest from pydolphinscheduler.exceptions import PyDSParamException -from pydolphinscheduler.tasks.http import ( - Http, - HttpCheckCondition, - HttpMethod, - HttpTaskParams, -) +from pydolphinscheduler.tasks.http import Http, HttpCheckCondition, HttpMethod @pytest.mark.parametrize( @@ -50,6 +45,38 @@ def test_attr_exists(class_name, attrs): assert all(hasattr(class_name, attr) for attr in attrs) +@pytest.mark.parametrize( + "attr, expect", + [ + ( + {"url": "https://www.apache.org"}, + { + "url": "https://www.apache.org", + "httpMethod": "GET", + "httpParams": [], + "httpCheckCondition": "STATUS_CODE_DEFAULT", + "condition": None, + "connectTimeout": 60000, + "socketTimeout": 60000, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_property_task_params(mock_code_version, attr, expect): + """Test task http property.""" + task = Http("test-http-task-params", **attr) + assert expect == task.task_params + + @pytest.mark.parametrize( "param", [ @@ -62,18 +89,22 @@ def test_attr_exists(class_name, attrs): }, ], ) -def test_http_task_param_not_support_param(param): +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_http_task_param_not_support_param(mock_code, param): """Test HttpTaskParams not support parameter.""" url = "https://www.apache.org" with pytest.raises(PyDSParamException, match="Parameter .*?"): - HttpTaskParams(url, **param) + Http("test-no-supprot-param", url, **param) -def test_http_to_dict(): - """Test task HTTP function to_dict.""" +def test_http_get_define(): + """Test task HTTP function get_define.""" code = 123 version = 1 - name = "test_http_to_dict" + name = "test_http_get_define" url = "https://www.apache.org" expect = { "code": code, @@ -110,4 +141,4 @@ def test_http_to_dict(): return_value=(code, version), ): http = Http(name, url) - assert http.to_dict() == expect + assert http.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py index f9e7f04678..dbcd2986fb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -23,26 +23,33 @@ from unittest.mock import patch import pytest from pydolphinscheduler.exceptions import PyDSParamException -from pydolphinscheduler.tasks.python import Python, PythonTaskParams +from pydolphinscheduler.tasks.python import Python @pytest.mark.parametrize( - "name, value", + "attr, expect", [ - ("local_params", "local_params"), - ("resource_list", "resource_list"), - ("dependence", "dependence"), - ("wait_start_timeout", "wait_start_timeout"), - ("condition_result", "condition_result"), + ( + {"code": "print(1)"}, + { + "rawScript": "print(1)", + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) ], ) -def test_python_task_params_attr_setter(name, value): - """Test python task parameters.""" - command = 'print("hello world.")' - python_task_params = PythonTaskParams(command) - assert command == python_task_params.raw_script - setattr(python_task_params, name, value) - assert value == getattr(python_task_params, name) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_property_task_params(mock_code_version, attr, expect): + """Test task python property.""" + task = Python("test-python-task-params", **attr) + assert expect == task.task_params @pytest.mark.parametrize( @@ -52,19 +59,16 @@ def test_python_task_params_attr_setter(name, value): ("print", "hello world"), ], ) -def test_python_task_not_support_code(script_code): +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_python_task_not_support_code(mock_code, script_code): """Test python task parameters.""" name = "not_support_code_type" - code = 123 - version = 1 - with patch( - "pydolphinscheduler.core.task.Task.gen_code_and_version", - return_value=(code, version), - ): - with pytest.raises( - PyDSParamException, match="Parameter code do not support .*?" - ): - Python(name, script_code) + with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"): + task = Python(name, script_code) + task.raw_script def foo(): # noqa: D103 @@ -82,8 +86,8 @@ def foo(): # noqa: D103 ), ], ) -def test_python_to_dict(name, script_code, raw): - """Test task python function to_dict.""" +def test_python_get_define(name, script_code, raw): + """Test task python function get_define.""" code = 123 version = 1 expect = { @@ -115,4 +119,4 @@ def test_python_to_dict(name, script_code, raw): return_value=(code, version), ): shell = Python(name, script_code) - assert shell.to_dict() == expect + assert shell.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py index 56fae1c51f..e42f6dc0fb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py @@ -22,33 +22,40 @@ from unittest.mock import patch import pytest -from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams +from pydolphinscheduler.tasks.shell import Shell @pytest.mark.parametrize( - "name, value", + "attr, expect", [ - ("local_params", "local_params"), - ("resource_list", "resource_list"), - ("dependence", "dependence"), - ("wait_start_timeout", "wait_start_timeout"), - ("condition_result", "condition_result"), + ( + {"command": "test script"}, + { + "rawScript": "test script", + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) ], ) -def test_shell_task_params_attr_setter(name, value): - """Test shell task parameters.""" - raw_script = "echo shell task parameter" - shell_task_params = ShellTaskParams(raw_script) - assert raw_script == shell_task_params.raw_script - setattr(shell_task_params, name, value) - assert value == getattr(shell_task_params, name) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_property_task_params(mock_code_version, attr, expect): + """Test task shell task property.""" + task = Shell("test-shell-task-params", **attr) + assert expect == task.task_params -def test_shell_to_dict(): - """Test task shell function to_dict.""" +def test_shell_get_define(): + """Test task shell function get_define.""" code = 123 version = 1 - name = "test_shell_to_dict" + name = "test_shell_get_define" command = "echo test shell" expect = { "code": code, @@ -79,4 +86,4 @@ def test_shell_to_dict(): return_value=(code, version), ): shell = Shell(name, command) - assert shell.to_dict() == expect + assert shell.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py index 499b46b4bb..2590100ae1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py @@ -82,12 +82,48 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type): ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}" +@pytest.mark.parametrize( + "attr, expect", + [ + ( + {"datasource_name": "datasource_name", "sql": "select 1"}, + { + "sql": "select 1", + "type": "MYSQL", + "datasource": 1, + "sqlType": SqlType.SELECT, + "preStatements": [], + "postStatements": [], + "displayRows": 10, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_property_task_params(mock_datasource, mock_code_version, attr, expect): + """Test task sql task property.""" + task = Sql("test-sql-task-params", **attr) + assert expect == task.task_params + + @patch( "pydolphinscheduler.tasks.sql.Sql.get_datasource_info", return_value=({"id": 1, "type": "MYSQL"}), ) -def test_sql_to_dict(mock_datasource): - """Test task sql function to_dict.""" +def test_sql_get_define(mock_datasource): + """Test task sql function get_define.""" code = 123 version = 1 name = "test_sql_dict" @@ -128,4 +164,4 @@ def test_sql_to_dict(mock_datasource): return_value=(code, version), ): task = Sql(name, datasource_name, command) - assert task.to_dict() == expect + assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py index 4a5388a086..7f471a1b8b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py @@ -23,7 +23,7 @@ from unittest.mock import patch import pytest from pydolphinscheduler.core.process_definition import ProcessDefinition -from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams +from pydolphinscheduler.tasks.sub_process import SubProcess TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition" TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320" @@ -31,22 +31,39 @@ TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" @pytest.mark.parametrize( - "name, value", + "attr, expect", [ - ("local_params", "local_params"), - ("resource_list", "resource_list"), - ("dependence", "dependence"), - ("wait_start_timeout", "wait_start_timeout"), - ("condition_result", "condition_result"), + ( + {"process_definition_name": TEST_SUB_PROCESS_DEFINITION_NAME}, + { + "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) ], ) -def test_sub_process_task_params_attr_setter(name, value): - """Test sub_process task parameters.""" - process_definition_code = "3643589832320" - sub_process_task_params = SubProcessTaskParams(process_definition_code) - assert process_definition_code == sub_process_task_params.process_definition_code - setattr(sub_process_task_params, name, value) - assert value == getattr(sub_process_task_params, name) +@patch( + "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info", + return_value=( + { + "id": 1, + "name": TEST_SUB_PROCESS_DEFINITION_NAME, + "code": TEST_SUB_PROCESS_DEFINITION_CODE, + } + ), +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_property_task_params(mock_code_version, mock_pd_info, attr, expect): + """Test task sub process property.""" + task = SubProcess("test-sub-process-task-params", **attr) + assert expect == task.task_params @patch( @@ -59,11 +76,11 @@ def test_sub_process_task_params_attr_setter(name, value): } ), ) -def test_sub_process_to_dict(mock_process_definition): - """Test task sub_process function to_dict.""" +def test_sub_process_get_define(mock_process_definition): + """Test task sub_process function get_define.""" code = 123 version = 1 - name = "test_sub_process_to_dict" + name = "test_sub_process_get_define" expect = { "code": code, "name": name, @@ -94,4 +111,4 @@ def test_sub_process_to_dict(mock_process_definition): ): with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME): sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME) - assert sub_process.to_dict() == expect + assert sub_process.get_define() == expect