Browse Source
Using the whole word zookeeper instead of short cut zk
Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
(cherry picked from commit e50a32d9cc
)
3.1.0-release
JieguangZhou
2 years ago
committed by
Jiajie Zhong
8 changed files with 243 additions and 0 deletions
@ -0,0 +1,42 @@
|
||||
.. Licensed to the Apache Software Foundation (ASF) under one |
||||
or more contributor license agreements. See the NOTICE file |
||||
distributed with this work for additional information |
||||
regarding copyright ownership. The ASF licenses this file |
||||
to you under the Apache License, Version 2.0 (the |
||||
"License"); you may not use this file except in compliance |
||||
with the License. You may obtain a copy of the License at |
||||
|
||||
.. http://www.apache.org/licenses/LICENSE-2.0 |
||||
|
||||
.. Unless required by applicable law or agreed to in writing, |
||||
software distributed under the License is distributed on an |
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||
KIND, either express or implied. See the License for the |
||||
specific language governing permissions and limitations |
||||
under the License. |
||||
|
||||
OpenMLDB |
||||
========= |
||||
|
||||
|
||||
A OpenMLDB task type's example and dive into information of **PyDolphinScheduler**. |
||||
|
||||
Example |
||||
------- |
||||
|
||||
.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_openmldb_example.py |
||||
:start-after: [start workflow_declare] |
||||
:end-before: [end workflow_declare] |
||||
|
||||
Dive Into |
||||
--------- |
||||
|
||||
.. automodule:: pydolphinscheduler.tasks.openmldb |
||||
|
||||
|
||||
YAML file example |
||||
----------------- |
||||
|
||||
.. literalinclude:: ../../../examples/yaml_define/OpenMLDB.yaml |
||||
:start-after: # under the License. |
||||
:language: yaml |
@ -0,0 +1,33 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one |
||||
# or more contributor license agreements. See the NOTICE file |
||||
# distributed with this work for additional information |
||||
# regarding copyright ownership. The ASF licenses this file |
||||
# to you under the Apache License, Version 2.0 (the |
||||
# "License"); you may not use this file except in compliance |
||||
# with the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, |
||||
# software distributed under the License is distributed on an |
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||
# KIND, either express or implied. See the License for the |
||||
# specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
# Define the workflow |
||||
workflow: |
||||
name: "OpenMLDB" |
||||
|
||||
# Define the tasks under the workflow |
||||
tasks: |
||||
- name: OpenMLDB |
||||
task_type: OpenMLDB |
||||
zookeeper: "127.0.0.1:2181" |
||||
zookeeper_path: "/openmldb" |
||||
execute_mode: "online" |
||||
sql: | |
||||
USE demo_db; |
||||
set @@job_timeout=200000; |
||||
LOAD DATA INFILE 'file:///tmp/train_sample.csv' |
||||
INTO TABLE talkingdata OPTIONS(mode='overwrite'); |
@ -0,0 +1,43 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one |
||||
# or more contributor license agreements. See the NOTICE file |
||||
# distributed with this work for additional information |
||||
# regarding copyright ownership. The ASF licenses this file |
||||
# to you under the Apache License, Version 2.0 (the |
||||
# "License"); you may not use this file except in compliance |
||||
# with the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, |
||||
# software distributed under the License is distributed on an |
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||
# KIND, either express or implied. See the License for the |
||||
# specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
# [start workflow_declare] |
||||
"""A example workflow for task openmldb.""" |
||||
|
||||
from pydolphinscheduler.core.process_definition import ProcessDefinition |
||||
from pydolphinscheduler.tasks.openmldb import OpenMLDB |
||||
|
||||
sql = """USE demo_db; |
||||
set @@job_timeout=200000; |
||||
LOAD DATA INFILE 'file:///tmp/train_sample.csv' |
||||
INTO TABLE talkingdata OPTIONS(mode='overwrite'); |
||||
""" |
||||
|
||||
with ProcessDefinition( |
||||
name="task_openmldb_example", |
||||
tenant="tenant_exists", |
||||
) as pd: |
||||
task_openmldb = OpenMLDB( |
||||
name="task_openmldb", |
||||
zookeeper="127.0.0.1:2181", |
||||
zookeeper_path="/openmldb", |
||||
execute_mode="offline", |
||||
sql=sql, |
||||
) |
||||
|
||||
pd.run() |
||||
# [end workflow_declare] |
@ -0,0 +1,48 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one |
||||
# or more contributor license agreements. See the NOTICE file |
||||
# distributed with this work for additional information |
||||
# regarding copyright ownership. The ASF licenses this file |
||||
# to you under the Apache License, Version 2.0 (the |
||||
# "License"); you may not use this file except in compliance |
||||
# with the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, |
||||
# software distributed under the License is distributed on an |
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||
# KIND, either express or implied. See the License for the |
||||
# specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
"""Task OpenMLDB.""" |
||||
|
||||
from pydolphinscheduler.constants import TaskType |
||||
from pydolphinscheduler.core.task import Task |
||||
|
||||
|
||||
class OpenMLDB(Task): |
||||
"""Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler. |
||||
|
||||
:param name: task name |
||||
:param zookeeper: OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181. |
||||
:param zookeeper_path: OpenMLDB cluster zookeeper path, e.g. /openmldb. |
||||
:param execute_mode: Determine the init mode, offline or online. You can switch it in sql statementself. |
||||
:param sql: SQL statement. |
||||
""" |
||||
|
||||
_task_custom_attr = { |
||||
"zk", |
||||
"zk_path", |
||||
"execute_mode", |
||||
"sql", |
||||
} |
||||
|
||||
def __init__( |
||||
self, name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs |
||||
): |
||||
super().__init__(name, TaskType.OPENMLDB, *args, **kwargs) |
||||
self.zk = zookeeper |
||||
self.zk_path = zookeeper_path |
||||
self.execute_mode = execute_mode |
||||
self.sql = sql |
@ -0,0 +1,73 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one |
||||
# or more contributor license agreements. See the NOTICE file |
||||
# distributed with this work for additional information |
||||
# regarding copyright ownership. The ASF licenses this file |
||||
# to you under the Apache License, Version 2.0 (the |
||||
# "License"); you may not use this file except in compliance |
||||
# with the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, |
||||
# software distributed under the License is distributed on an |
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||
# KIND, either express or implied. See the License for the |
||||
# specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
"""Test Task OpenMLDB.""" |
||||
from unittest.mock import patch |
||||
|
||||
from pydolphinscheduler.constants import TaskType |
||||
from pydolphinscheduler.tasks.openmldb import OpenMLDB |
||||
|
||||
|
||||
def test_openmldb_get_define(): |
||||
"""Test task openmldb function get_define.""" |
||||
zookeeper = "127.0.0.1:2181" |
||||
zookeeper_path = "/openmldb" |
||||
execute_mode = "offline" |
||||
|
||||
sql = """USE demo_db; |
||||
set @@job_timeout=200000; |
||||
LOAD DATA INFILE 'file:///tmp/train_sample.csv' |
||||
INTO TABLE talkingdata OPTIONS(mode='overwrite'); |
||||
""" |
||||
|
||||
code = 123 |
||||
version = 1 |
||||
name = "test_openmldb_get_define" |
||||
expect = { |
||||
"code": code, |
||||
"name": name, |
||||
"version": 1, |
||||
"description": None, |
||||
"delayTime": 0, |
||||
"taskType": TaskType.OPENMLDB, |
||||
"taskParams": { |
||||
"resourceList": [], |
||||
"localParams": [], |
||||
"zk": zookeeper, |
||||
"zkPath": zookeeper_path, |
||||
"executeMode": execute_mode, |
||||
"sql": sql, |
||||
"dependence": {}, |
||||
"conditionResult": {"successNode": [""], "failedNode": [""]}, |
||||
"waitStartTimeout": {}, |
||||
}, |
||||
"flag": "YES", |
||||
"taskPriority": "MEDIUM", |
||||
"workerGroup": "default", |
||||
"environmentCode": None, |
||||
"failRetryTimes": 0, |
||||
"failRetryInterval": 1, |
||||
"timeoutFlag": "CLOSE", |
||||
"timeoutNotifyStrategy": None, |
||||
"timeout": 0, |
||||
} |
||||
with patch( |
||||
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
||||
return_value=(code, version), |
||||
): |
||||
openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql) |
||||
assert openmldb.get_define() == expect |
Loading…
Reference in new issue