From e50a32d9ccee57e4b56f63b9bee2c4fd888c0c32 Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Sat, 17 Sep 2022 15:24:17 +0800 Subject: [PATCH] [feat][python] Support OpenMLDB task in python api (#11944) Using the whole word zookeeper instead of short cut zk Co-authored-by: Jiajie Zhong --- .../docs/source/tasks/index.rst | 1 + .../docs/source/tasks/openmldb.rst | 42 +++++++++++ .../examples/yaml_define/OpenMLDB.yaml | 33 +++++++++ .../src/pydolphinscheduler/constants.py | 1 + .../examples/task_openmldb_example.py | 43 +++++++++++ .../src/pydolphinscheduler/tasks/__init__.py | 2 + .../src/pydolphinscheduler/tasks/openmldb.py | 48 ++++++++++++ .../tests/tasks/test_openmldb.py | 73 +++++++++++++++++++ 8 files changed, 243 insertions(+) create mode 100644 dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst create mode 100644 dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst index a13652a526..5b9c165700 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst @@ -42,4 +42,5 @@ In this section sub_process sagemaker + openmldb pytorch diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst new file mode 100644 index 0000000000..125313dc21 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst @@ -0,0 +1,42 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +OpenMLDB +========= + + +A OpenMLDB task type's example and dive into information of **PyDolphinScheduler**. + +Example +------- + +.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_openmldb_example.py + :start-after: [start workflow_declare] + :end-before: [end workflow_declare] + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.tasks.openmldb + + +YAML file example +----------------- + +.. literalinclude:: ../../../examples/yaml_define/OpenMLDB.yaml + :start-after: # under the License. + :language: yaml diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml new file mode 100644 index 0000000000..b455cb0768 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Define the workflow +workflow: + name: "OpenMLDB" + +# Define the tasks under the workflow +tasks: + - name: OpenMLDB + task_type: OpenMLDB + zookeeper: "127.0.0.1:2181" + zookeeper_path: "/openmldb" + execute_mode: "online" + sql: | + USE demo_db; + set @@job_timeout=200000; + LOAD DATA INFILE 'file:///tmp/train_sample.csv' + INTO TABLE talkingdata OPTIONS(mode='overwrite'); diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 7eb5d04210..d8d2febfeb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -58,6 +58,7 @@ class TaskType(str): SPARK = "SPARK" MR = "MR" SAGEMAKER = "SAGEMAKER" + OPENMLDB = "OPENMLDB" PYTORCH = "PYTORCH" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py new file mode 100644 index 0000000000..5b90091ecf --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# [start workflow_declare] +"""A example workflow for task openmldb.""" + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.openmldb import OpenMLDB + +sql = """USE demo_db; +set @@job_timeout=200000; +LOAD DATA INFILE 'file:///tmp/train_sample.csv' +INTO TABLE talkingdata OPTIONS(mode='overwrite'); +""" + +with ProcessDefinition( + name="task_openmldb_example", + tenant="tenant_exists", +) as pd: + task_openmldb = OpenMLDB( + name="task_openmldb", + zookeeper="127.0.0.1:2181", + zookeeper_path="/openmldb", + execute_mode="offline", + sql=sql, + ) + + pd.run() +# [end workflow_declare] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py index 1481722433..e5b263c7c2 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py @@ -23,6 +23,7 @@ from pydolphinscheduler.tasks.dependent import Dependent from pydolphinscheduler.tasks.flink import Flink from pydolphinscheduler.tasks.http import Http from pydolphinscheduler.tasks.map_reduce import MR +from pydolphinscheduler.tasks.openmldb import OpenMLDB from pydolphinscheduler.tasks.procedure import Procedure from pydolphinscheduler.tasks.python import Python from pydolphinscheduler.tasks.pytorch import Pytorch @@ -41,6 +42,7 @@ __all__ = [ "Flink", "Http", "MR", + "OpenMLDB", "Procedure", "Python", "Pytorch", diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py new file mode 100644 index 0000000000..5dad36ec11 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task OpenMLDB.""" + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task + + +class OpenMLDB(Task): + """Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler. + + :param name: task name + :param zookeeper: OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181. + :param zookeeper_path: OpenMLDB cluster zookeeper path, e.g. /openmldb. + :param execute_mode: Determine the init mode, offline or online. You can switch it in sql statementself. + :param sql: SQL statement. + """ + + _task_custom_attr = { + "zk", + "zk_path", + "execute_mode", + "sql", + } + + def __init__( + self, name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs + ): + super().__init__(name, TaskType.OPENMLDB, *args, **kwargs) + self.zk = zookeeper + self.zk_path = zookeeper_path + self.execute_mode = execute_mode + self.sql = sql diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py new file mode 100644 index 0000000000..f580ab06b2 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task OpenMLDB.""" +from unittest.mock import patch + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.tasks.openmldb import OpenMLDB + + +def test_openmldb_get_define(): + """Test task openmldb function get_define.""" + zookeeper = "127.0.0.1:2181" + zookeeper_path = "/openmldb" + execute_mode = "offline" + + sql = """USE demo_db; + set @@job_timeout=200000; + LOAD DATA INFILE 'file:///tmp/train_sample.csv' + INTO TABLE talkingdata OPTIONS(mode='overwrite'); + """ + + code = 123 + version = 1 + name = "test_openmldb_get_define" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": TaskType.OPENMLDB, + "taskParams": { + "resourceList": [], + "localParams": [], + "zk": zookeeper, + "zkPath": zookeeper_path, + "executeMode": execute_mode, + "sql": sql, + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "environmentCode": None, + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql) + assert openmldb.get_define() == expect