Browse Source

[feat][python] Support OpenMLDB task in python api (#11944)

Using the whole word zookeeper instead of short cut zk
Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
e50a32d9cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
  2. 42
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst
  3. 33
      dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml
  4. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  5. 43
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py
  6. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
  7. 48
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py
  8. 73
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py

1
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst

@ -42,4 +42,5 @@ In this section
sub_process
sagemaker
openmldb
pytorch

42
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/openmldb.rst

@ -0,0 +1,42 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
OpenMLDB
=========
A OpenMLDB task type's example and dive into information of **PyDolphinScheduler**.
Example
-------
.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_openmldb_example.py
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.openmldb
YAML file example
-----------------
.. literalinclude:: ../../../examples/yaml_define/OpenMLDB.yaml
:start-after: # under the License.
:language: yaml

33
dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/OpenMLDB.yaml

@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Define the workflow
workflow:
name: "OpenMLDB"
# Define the tasks under the workflow
tasks:
- name: OpenMLDB
task_type: OpenMLDB
zookeeper: "127.0.0.1:2181"
zookeeper_path: "/openmldb"
execute_mode: "online"
sql: |
USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -58,6 +58,7 @@ class TaskType(str):
SPARK = "SPARK"
MR = "MR"
SAGEMAKER = "SAGEMAKER"
OPENMLDB = "OPENMLDB"
PYTORCH = "PYTORCH"

43
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_openmldb_example.py

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# [start workflow_declare]
"""A example workflow for task openmldb."""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.openmldb import OpenMLDB
sql = """USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');
"""
with ProcessDefinition(
name="task_openmldb_example",
tenant="tenant_exists",
) as pd:
task_openmldb = OpenMLDB(
name="task_openmldb",
zookeeper="127.0.0.1:2181",
zookeeper_path="/openmldb",
execute_mode="offline",
sql=sql,
)
pd.run()
# [end workflow_declare]

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py

@ -23,6 +23,7 @@ from pydolphinscheduler.tasks.dependent import Dependent
from pydolphinscheduler.tasks.flink import Flink
from pydolphinscheduler.tasks.http import Http
from pydolphinscheduler.tasks.map_reduce import MR
from pydolphinscheduler.tasks.openmldb import OpenMLDB
from pydolphinscheduler.tasks.procedure import Procedure
from pydolphinscheduler.tasks.python import Python
from pydolphinscheduler.tasks.pytorch import Pytorch
@ -41,6 +42,7 @@ __all__ = [
"Flink",
"Http",
"MR",
"OpenMLDB",
"Procedure",
"Python",
"Pytorch",

48
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/openmldb.py

@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task OpenMLDB."""
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
class OpenMLDB(Task):
"""Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.
:param name: task name
:param zookeeper: OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
:param zookeeper_path: OpenMLDB cluster zookeeper path, e.g. /openmldb.
:param execute_mode: Determine the init mode, offline or online. You can switch it in sql statementself.
:param sql: SQL statement.
"""
_task_custom_attr = {
"zk",
"zk_path",
"execute_mode",
"sql",
}
def __init__(
self, name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs
):
super().__init__(name, TaskType.OPENMLDB, *args, **kwargs)
self.zk = zookeeper
self.zk_path = zookeeper_path
self.execute_mode = execute_mode
self.sql = sql

73
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_openmldb.py

@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task OpenMLDB."""
from unittest.mock import patch
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.tasks.openmldb import OpenMLDB
def test_openmldb_get_define():
"""Test task openmldb function get_define."""
zookeeper = "127.0.0.1:2181"
zookeeper_path = "/openmldb"
execute_mode = "offline"
sql = """USE demo_db;
set @@job_timeout=200000;
LOAD DATA INFILE 'file:///tmp/train_sample.csv'
INTO TABLE talkingdata OPTIONS(mode='overwrite');
"""
code = 123
version = 1
name = "test_openmldb_get_define"
expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": TaskType.OPENMLDB,
"taskParams": {
"resourceList": [],
"localParams": [],
"zk": zookeeper,
"zkPath": zookeeper_path,
"executeMode": execute_mode,
"sql": sql,
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql)
assert openmldb.get_define() == expect
Loading…
Cancel
Save