From 1948030151173c72c81c45346e37bba6dc35d44b Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Mon, 13 Dec 2021 19:54:24 +0800 Subject: [PATCH] [python] Add task base database and procedure (#7279) We add a new task procedure, and add parent class database for both sql task and procedure task fix: #6929 --- .../src/pydolphinscheduler/constants.py | 1 + .../src/pydolphinscheduler/tasks/database.py | 83 ++++++++++++ .../src/pydolphinscheduler/tasks/procedure.py | 43 ++++++ .../src/pydolphinscheduler/tasks/sql.py | 44 +----- .../tests/tasks/test_database.py | 127 ++++++++++++++++++ .../tests/tasks/test_procedure.py | 122 +++++++++++++++++ .../tests/tasks/test_sql.py | 2 +- 7 files changed, 381 insertions(+), 41 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 72034598a8..c2899abd26 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -72,6 +72,7 @@ class TaskType(str): PYTHON = "PYTHON" SQL = "SQL" SUB_PROCESS = "SUB_PROCESS" + PROCEDURE = "PROCEDURE" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py new file mode 100644 index 0000000000..686f57eae0 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task database base task.""" + +from typing import Dict + +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.java_gateway import launch_gateway + + +class Database(Task): + """Base task to handle database, declare behavior for the base handler of database. + + It a parent class for all database task of dolphinscheduler. And it should run sql like + job in multiply sql lik engine, such as: + - ClickHouse + - DB2 + - HIVE + - MySQL + - Oracle + - Postgresql + - Presto + - SQLServer + You provider datasource_name contain connection information, it decisions which + database type and database instance would run this sql. + """ + + _task_custom_attr = {"sql"} + + def __init__( + self, task_type: str, name: str, datasource_name: str, sql: str, *args, **kwargs + ): + super().__init__(name, task_type, *args, **kwargs) + self.datasource_name = datasource_name + self.sql = sql + self._datasource = {} + + def get_datasource_type(self) -> str: + """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`.""" + return self.get_datasource_info(self.datasource_name).get("type") + + def get_datasource_id(self) -> str: + """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`.""" + return self.get_datasource_info(self.datasource_name).get("id") + + def get_datasource_info(self, name) -> Dict: + """Get datasource info from java gateway, contains datasource id, type, name.""" + if self._datasource: + return self._datasource + else: + gateway = launch_gateway() + self._datasource = gateway.entry_point.getDatasourceInfo(name) + return self._datasource + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for sql task. + + Sql task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + custom_params = { + "type": self.get_datasource_type(), + "datasource": self.get_datasource_id(), + } + params.update(custom_params) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py new file mode 100644 index 0000000000..f9497be659 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task procedure.""" + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.tasks.database import Database + + +class Procedure(Database): + """Task Procedure object, declare behavior for Procedure task to dolphinscheduler. + + It should run database procedure job in multiply sql lik engine, such as: + - ClickHouse + - DB2 + - HIVE + - MySQL + - Oracle + - Postgresql + - Presto + - SQLServer + You provider datasource_name contain connection information, it decisions which + database type and database instance would run this sql. + """ + + def __init__(self, name: str, datasource_name: str, sql: str, *args, **kwargs): + super().__init__( + TaskType.PROCEDURE, name, datasource_name, sql, *args, **kwargs + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py index f16eb10509..b6ee745850 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py @@ -18,11 +18,10 @@ """Task sql.""" import re -from typing import Dict, Optional +from typing import Optional from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task -from pydolphinscheduler.java_gateway import launch_gateway +from pydolphinscheduler.tasks.database import Database class SqlType: @@ -32,7 +31,7 @@ class SqlType: NOT_SELECT = 1 -class Sql(Task): +class Sql(Database): """Task SQL object, declare behavior for SQL task to dolphinscheduler. It should run sql job in multiply sql lik engine, such as: @@ -67,30 +66,10 @@ class Sql(Task): *args, **kwargs ): - super().__init__(name, TaskType.SQL, *args, **kwargs) - self.datasource_name = datasource_name - self.sql = sql + super().__init__(TaskType.SQL, name, datasource_name, sql, *args, **kwargs) self.pre_statements = pre_statements or [] self.post_statements = post_statements or [] self.display_rows = display_rows - self._datasource = {} - - def get_datasource_type(self) -> str: - """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self.datasource_name).get("type") - - def get_datasource_id(self) -> str: - """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`.""" - return self.get_datasource_info(self.datasource_name).get("id") - - def get_datasource_info(self, name) -> Dict: - """Get datasource info from java gateway, contains datasource id, type, name.""" - if self._datasource: - return self._datasource - else: - gateway = launch_gateway() - self._datasource = gateway.entry_point.getDatasourceInfo(name) - return self._datasource @property def sql_type(self) -> int: @@ -103,18 +82,3 @@ class Sql(Task): return SqlType.NOT_SELECT else: return SqlType.SELECT - - @property - def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: - """Override Task.task_params for sql task. - - Sql task have some specials attribute for task_params, and is odd if we - directly set as python property, so we Override Task.task_params here. - """ - params = super().task_params - custom_params = { - "type": self.get_datasource_type(), - "datasource": self.get_datasource_id(), - } - params.update(custom_params) - return params diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py new file mode 100644 index 0000000000..dd510a8786 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Database.""" + + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.database import Database + +TEST_DATABASE_TASK_TYPE = "SQL" +TEST_DATABASE_SQL = "select 1" +TEST_DATABASE_DATASOURCE_NAME = "test_datasource" + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.database.Database.get_datasource_info", + return_value=({"id": 1, "type": "mock_type"}), +) +def test_get_datasource_detail(mock_datasource, mock_code_version): + """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value.""" + name = "test_get_database_detail" + task = Database( + TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL + ) + assert 1 == task.get_datasource_id() + assert "mock_type" == task.get_datasource_type() + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "task_type": TEST_DATABASE_TASK_TYPE, + "name": "test-task-params", + "datasource_name": TEST_DATABASE_DATASOURCE_NAME, + "sql": TEST_DATABASE_SQL, + }, + { + "type": "MYSQL", + "datasource": 1, + "sql": TEST_DATABASE_SQL, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.database.Database.get_datasource_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_property_task_params(mock_datasource, mock_code_version, attr, expect): + """Test task database task property.""" + task = Database(**attr) + assert expect == task.task_params + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.database.Database.get_datasource_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_database_get_define(mock_datasource, mock_code_version): + """Test task database function get_define.""" + name = "test_database_get_define" + expect = { + "code": 123, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": TEST_DATABASE_TASK_TYPE, + "taskParams": { + "type": "MYSQL", + "datasource": 1, + "sql": TEST_DATABASE_SQL, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + task = Database( + TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL + ) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py new file mode 100644 index 0000000000..42c38e1233 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Procedure.""" + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.procedure import Procedure + +TEST_PROCEDURE_SQL = ( + 'create procedure HelloWorld() selece "hello world"; call HelloWorld();' +) +TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource" + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", + return_value=({"id": 1, "type": "mock_type"}), +) +def test_get_datasource_detail(mock_datasource, mock_code_version): + """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value.""" + name = "test_get_datasource_detail" + task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL) + assert 1 == task.get_datasource_id() + assert "mock_type" == task.get_datasource_type() + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "test-procedure-task-params", + "datasource_name": TEST_PROCEDURE_DATASOURCE_NAME, + "sql": TEST_PROCEDURE_SQL, + }, + { + "sql": TEST_PROCEDURE_SQL, + "type": "MYSQL", + "datasource": 1, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_property_task_params(mock_datasource, mock_code_version, attr, expect): + """Test task sql task property.""" + task = Procedure(**attr) + assert expect == task.task_params + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info", + return_value=({"id": 1, "type": "MYSQL"}), +) +def test_sql_get_define(mock_datasource, mock_code_version): + """Test task procedure function get_define.""" + name = "test_procedure_get_define" + expect = { + "code": 123, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "PROCEDURE", + "taskParams": { + "type": "MYSQL", + "datasource": 1, + "sql": TEST_PROCEDURE_SQL, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py index 2590100ae1..6058cce8ed 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py @@ -126,7 +126,7 @@ def test_sql_get_define(mock_datasource): """Test task sql function get_define.""" code = 123 version = 1 - name = "test_sql_dict" + name = "test_sql_get_define" command = "select 1" datasource_name = "test_datasource" expect = {