JieguangZhou
2 years ago
committed by
GitHub
13 changed files with 486 additions and 97 deletions
@ -0,0 +1,41 @@ |
|||||||
|
.. Licensed to the Apache Software Foundation (ASF) under one |
||||||
|
or more contributor license agreements. See the NOTICE file |
||||||
|
distributed with this work for additional information |
||||||
|
regarding copyright ownership. The ASF licenses this file |
||||||
|
to you under the Apache License, Version 2.0 (the |
||||||
|
"License"); you may not use this file except in compliance |
||||||
|
with the License. You may obtain a copy of the License at |
||||||
|
|
||||||
|
.. http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
|
||||||
|
.. Unless required by applicable law or agreed to in writing, |
||||||
|
software distributed under the License is distributed on an |
||||||
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||||
|
KIND, either express or implied. See the License for the |
||||||
|
specific language governing permissions and limitations |
||||||
|
under the License. |
||||||
|
|
||||||
|
DVC |
||||||
|
=== |
||||||
|
|
||||||
|
A DVC task type's example and dive into information of **PyDolphinScheduler**. |
||||||
|
|
||||||
|
Example |
||||||
|
------- |
||||||
|
|
||||||
|
.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_dvc_example.py |
||||||
|
:start-after: [start workflow_declare] |
||||||
|
:end-before: [end workflow_declare] |
||||||
|
|
||||||
|
Dive Into |
||||||
|
--------- |
||||||
|
|
||||||
|
.. automodule:: pydolphinscheduler.tasks.dvc |
||||||
|
|
||||||
|
|
||||||
|
YAML file example |
||||||
|
----------------- |
||||||
|
|
||||||
|
.. literalinclude:: ../../../examples/yaml_define/Dvc.yaml |
||||||
|
:start-after: # under the License. |
||||||
|
:language: yaml |
@ -0,0 +1,46 @@ |
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one |
||||||
|
# or more contributor license agreements. See the NOTICE file |
||||||
|
# distributed with this work for additional information |
||||||
|
# regarding copyright ownership. The ASF licenses this file |
||||||
|
# to you under the Apache License, Version 2.0 (the |
||||||
|
# "License"); you may not use this file except in compliance |
||||||
|
# with the License. You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, |
||||||
|
# software distributed under the License is distributed on an |
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||||
|
# KIND, either express or implied. See the License for the |
||||||
|
# specific language governing permissions and limitations |
||||||
|
# under the License. |
||||||
|
|
||||||
|
# Define variable `repository` |
||||||
|
repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" |
||||||
|
|
||||||
|
# Define the workflow |
||||||
|
workflow: |
||||||
|
name: "DVC" |
||||||
|
release_state: "offline" |
||||||
|
|
||||||
|
# Define the tasks under the process |
||||||
|
tasks: |
||||||
|
- name: init_dvc |
||||||
|
task_type: DVCInit |
||||||
|
repository: *repository |
||||||
|
store_url: ~/dvc_data |
||||||
|
|
||||||
|
- name: upload_data |
||||||
|
task_type: DVCUpload |
||||||
|
repository: *repository |
||||||
|
data_path_in_dvc_repository: "iris" |
||||||
|
data_path_in_worker: ~/source/iris |
||||||
|
version: v1 |
||||||
|
message: upload iris data v1 |
||||||
|
|
||||||
|
- name: download_data |
||||||
|
task_type: DVCDownload |
||||||
|
repository: *repository |
||||||
|
data_path_in_dvc_repository: "iris" |
||||||
|
data_path_in_worker: ~/target/iris |
||||||
|
version: v1 |
@ -0,0 +1,52 @@ |
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one |
||||||
|
# or more contributor license agreements. See the NOTICE file |
||||||
|
# distributed with this work for additional information |
||||||
|
# regarding copyright ownership. The ASF licenses this file |
||||||
|
# to you under the Apache License, Version 2.0 (the |
||||||
|
# "License"); you may not use this file except in compliance |
||||||
|
# with the License. You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, |
||||||
|
# software distributed under the License is distributed on an |
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||||
|
# KIND, either express or implied. See the License for the |
||||||
|
# specific language governing permissions and limitations |
||||||
|
# under the License. |
||||||
|
|
||||||
|
# [start workflow_declare] |
||||||
|
"""A example workflow for task dvc.""" |
||||||
|
|
||||||
|
from pydolphinscheduler.core.process_definition import ProcessDefinition |
||||||
|
from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload |
||||||
|
|
||||||
|
repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" |
||||||
|
|
||||||
|
with ProcessDefinition( |
||||||
|
name="task_dvc_example", |
||||||
|
tenant="tenant_exists", |
||||||
|
) as pd: |
||||||
|
init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data") |
||||||
|
upload_task = DVCUpload( |
||||||
|
name="upload_data", |
||||||
|
repository=repository, |
||||||
|
data_path_in_dvc_repository="iris", |
||||||
|
data_path_in_worker="~/source/iris", |
||||||
|
version="v1", |
||||||
|
message="upload iris data v1", |
||||||
|
) |
||||||
|
|
||||||
|
download_task = DVCDownload( |
||||||
|
name="download_data", |
||||||
|
repository=repository, |
||||||
|
data_path_in_dvc_repository="iris", |
||||||
|
data_path_in_worker="~/target/iris", |
||||||
|
version="v1", |
||||||
|
) |
||||||
|
|
||||||
|
init_task >> upload_task >> download_task |
||||||
|
|
||||||
|
pd.run() |
||||||
|
|
||||||
|
# [end workflow_declare] |
@ -0,0 +1,124 @@ |
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one |
||||||
|
# or more contributor license agreements. See the NOTICE file |
||||||
|
# distributed with this work for additional information |
||||||
|
# regarding copyright ownership. The ASF licenses this file |
||||||
|
# to you under the Apache License, Version 2.0 (the |
||||||
|
# "License"); you may not use this file except in compliance |
||||||
|
# with the License. You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, |
||||||
|
# software distributed under the License is distributed on an |
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||||
|
# KIND, either express or implied. See the License for the |
||||||
|
# specific language governing permissions and limitations |
||||||
|
# under the License. |
||||||
|
|
||||||
|
"""Task dvc.""" |
||||||
|
from copy import deepcopy |
||||||
|
from typing import Dict |
||||||
|
|
||||||
|
from pydolphinscheduler.constants import TaskType |
||||||
|
from pydolphinscheduler.core.task import Task |
||||||
|
|
||||||
|
|
||||||
|
class DvcTaskType(str): |
||||||
|
"""Constants for dvc task type.""" |
||||||
|
|
||||||
|
INIT = "Init DVC" |
||||||
|
DOWNLOAD = "Download" |
||||||
|
UPLOAD = "Upload" |
||||||
|
|
||||||
|
|
||||||
|
class BaseDVC(Task): |
||||||
|
"""Base class for dvc task.""" |
||||||
|
|
||||||
|
dvc_task_type = None |
||||||
|
|
||||||
|
_task_custom_attr = { |
||||||
|
"dvc_task_type", |
||||||
|
"dvc_repository", |
||||||
|
} |
||||||
|
|
||||||
|
_child_task_dvc_attr = set() |
||||||
|
|
||||||
|
def __init__(self, name: str, repository: str, *args, **kwargs): |
||||||
|
super().__init__(name, TaskType.DVC, *args, **kwargs) |
||||||
|
self.dvc_repository = repository |
||||||
|
|
||||||
|
@property |
||||||
|
def task_params(self) -> Dict: |
||||||
|
"""Return task params.""" |
||||||
|
self._task_custom_attr = deepcopy(self._task_custom_attr) |
||||||
|
self._task_custom_attr.update(self._child_task_dvc_attr) |
||||||
|
return super().task_params |
||||||
|
|
||||||
|
|
||||||
|
class DVCInit(BaseDVC): |
||||||
|
"""Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler.""" |
||||||
|
|
||||||
|
dvc_task_type = DvcTaskType.INIT |
||||||
|
|
||||||
|
_child_task_dvc_attr = {"dvc_store_url"} |
||||||
|
|
||||||
|
def __init__(self, name: str, repository: str, store_url: str, *args, **kwargs): |
||||||
|
super().__init__(name, repository, *args, **kwargs) |
||||||
|
self.dvc_store_url = store_url |
||||||
|
|
||||||
|
|
||||||
|
class DVCDownload(BaseDVC): |
||||||
|
"""Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler.""" |
||||||
|
|
||||||
|
dvc_task_type = DvcTaskType.DOWNLOAD |
||||||
|
|
||||||
|
_child_task_dvc_attr = { |
||||||
|
"dvc_load_save_data_path", |
||||||
|
"dvc_data_location", |
||||||
|
"dvc_version", |
||||||
|
} |
||||||
|
|
||||||
|
def __init__( |
||||||
|
self, |
||||||
|
name: str, |
||||||
|
repository: str, |
||||||
|
data_path_in_dvc_repository: str, |
||||||
|
data_path_in_worker: str, |
||||||
|
version: str, |
||||||
|
*args, |
||||||
|
**kwargs |
||||||
|
): |
||||||
|
super().__init__(name, repository, *args, **kwargs) |
||||||
|
self.dvc_data_location = data_path_in_dvc_repository |
||||||
|
self.dvc_load_save_data_path = data_path_in_worker |
||||||
|
self.dvc_version = version |
||||||
|
|
||||||
|
|
||||||
|
class DVCUpload(BaseDVC): |
||||||
|
"""Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler.""" |
||||||
|
|
||||||
|
dvc_task_type = DvcTaskType.UPLOAD |
||||||
|
|
||||||
|
_child_task_dvc_attr = { |
||||||
|
"dvc_load_save_data_path", |
||||||
|
"dvc_data_location", |
||||||
|
"dvc_version", |
||||||
|
"dvc_message", |
||||||
|
} |
||||||
|
|
||||||
|
def __init__( |
||||||
|
self, |
||||||
|
name: str, |
||||||
|
repository: str, |
||||||
|
data_path_in_worker: str, |
||||||
|
data_path_in_dvc_repository: str, |
||||||
|
version: str, |
||||||
|
message: str, |
||||||
|
*args, |
||||||
|
**kwargs |
||||||
|
): |
||||||
|
super().__init__(name, repository, *args, **kwargs) |
||||||
|
self.dvc_data_location = data_path_in_dvc_repository |
||||||
|
self.dvc_load_save_data_path = data_path_in_worker |
||||||
|
self.dvc_version = version |
||||||
|
self.dvc_message = message |
@ -0,0 +1,173 @@ |
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one |
||||||
|
# or more contributor license agreements. See the NOTICE file |
||||||
|
# distributed with this work for additional information |
||||||
|
# regarding copyright ownership. The ASF licenses this file |
||||||
|
# to you under the Apache License, Version 2.0 (the |
||||||
|
# "License"); you may not use this file except in compliance |
||||||
|
# with the License. You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, |
||||||
|
# software distributed under the License is distributed on an |
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
||||||
|
# KIND, either express or implied. See the License for the |
||||||
|
# specific language governing permissions and limitations |
||||||
|
# under the License. |
||||||
|
|
||||||
|
"""Test Task Dvc.""" |
||||||
|
from unittest.mock import patch |
||||||
|
|
||||||
|
from pydolphinscheduler.constants import TaskType |
||||||
|
from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DvcTaskType, DVCUpload |
||||||
|
|
||||||
|
repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" |
||||||
|
|
||||||
|
|
||||||
|
def test_dvc_init_get_define(): |
||||||
|
"""Test task dvc init function get_define.""" |
||||||
|
name = "test_dvc_init" |
||||||
|
dvc_store_url = "~/dvc_data" |
||||||
|
|
||||||
|
code = 123 |
||||||
|
version = 1 |
||||||
|
expect = { |
||||||
|
"code": code, |
||||||
|
"name": name, |
||||||
|
"version": 1, |
||||||
|
"description": None, |
||||||
|
"delayTime": 0, |
||||||
|
"taskType": TaskType.DVC, |
||||||
|
"taskParams": { |
||||||
|
"resourceList": [], |
||||||
|
"localParams": [], |
||||||
|
"dvcTaskType": DvcTaskType.INIT, |
||||||
|
"dvcRepository": repository, |
||||||
|
"dvcStoreUrl": dvc_store_url, |
||||||
|
"dependence": {}, |
||||||
|
"conditionResult": {"successNode": [""], "failedNode": [""]}, |
||||||
|
"waitStartTimeout": {}, |
||||||
|
}, |
||||||
|
"flag": "YES", |
||||||
|
"taskPriority": "MEDIUM", |
||||||
|
"workerGroup": "default", |
||||||
|
"environmentCode": None, |
||||||
|
"failRetryTimes": 0, |
||||||
|
"failRetryInterval": 1, |
||||||
|
"timeoutFlag": "CLOSE", |
||||||
|
"timeoutNotifyStrategy": None, |
||||||
|
"timeout": 0, |
||||||
|
} |
||||||
|
with patch( |
||||||
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
||||||
|
return_value=(code, version), |
||||||
|
): |
||||||
|
dvc_init = DVCInit(name, repository, dvc_store_url) |
||||||
|
assert dvc_init.get_define() == expect |
||||||
|
|
||||||
|
|
||||||
|
def test_dvc_upload_get_define(): |
||||||
|
"""Test task dvc upload function get_define.""" |
||||||
|
name = "test_dvc_upload" |
||||||
|
data_path_in_dvc_repository = "iris" |
||||||
|
data_path_in_worker = "~/source/iris" |
||||||
|
version = "v1" |
||||||
|
message = "upload iris data v1" |
||||||
|
|
||||||
|
code = 123 |
||||||
|
version = 1 |
||||||
|
expect = { |
||||||
|
"code": code, |
||||||
|
"name": name, |
||||||
|
"version": 1, |
||||||
|
"description": None, |
||||||
|
"delayTime": 0, |
||||||
|
"taskType": TaskType.DVC, |
||||||
|
"taskParams": { |
||||||
|
"resourceList": [], |
||||||
|
"localParams": [], |
||||||
|
"dvcTaskType": DvcTaskType.UPLOAD, |
||||||
|
"dvcRepository": repository, |
||||||
|
"dvcDataLocation": data_path_in_dvc_repository, |
||||||
|
"dvcLoadSaveDataPath": data_path_in_worker, |
||||||
|
"dvcVersion": version, |
||||||
|
"dvcMessage": message, |
||||||
|
"dependence": {}, |
||||||
|
"conditionResult": {"successNode": [""], "failedNode": [""]}, |
||||||
|
"waitStartTimeout": {}, |
||||||
|
}, |
||||||
|
"flag": "YES", |
||||||
|
"taskPriority": "MEDIUM", |
||||||
|
"workerGroup": "default", |
||||||
|
"environmentCode": None, |
||||||
|
"failRetryTimes": 0, |
||||||
|
"failRetryInterval": 1, |
||||||
|
"timeoutFlag": "CLOSE", |
||||||
|
"timeoutNotifyStrategy": None, |
||||||
|
"timeout": 0, |
||||||
|
} |
||||||
|
with patch( |
||||||
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
||||||
|
return_value=(code, version), |
||||||
|
): |
||||||
|
dvc_upload = DVCUpload( |
||||||
|
name, |
||||||
|
repository=repository, |
||||||
|
data_path_in_dvc_repository=data_path_in_dvc_repository, |
||||||
|
data_path_in_worker=data_path_in_worker, |
||||||
|
version=version, |
||||||
|
message=message, |
||||||
|
) |
||||||
|
assert dvc_upload.get_define() == expect |
||||||
|
|
||||||
|
|
||||||
|
def test_dvc_download_get_define(): |
||||||
|
"""Test task dvc download function get_define.""" |
||||||
|
name = "test_dvc_upload" |
||||||
|
data_path_in_dvc_repository = "iris" |
||||||
|
data_path_in_worker = "~/target/iris" |
||||||
|
version = "v1" |
||||||
|
|
||||||
|
code = 123 |
||||||
|
version = 1 |
||||||
|
expect = { |
||||||
|
"code": code, |
||||||
|
"name": name, |
||||||
|
"version": 1, |
||||||
|
"description": None, |
||||||
|
"delayTime": 0, |
||||||
|
"taskType": TaskType.DVC, |
||||||
|
"taskParams": { |
||||||
|
"resourceList": [], |
||||||
|
"localParams": [], |
||||||
|
"dvcTaskType": DvcTaskType.DOWNLOAD, |
||||||
|
"dvcRepository": repository, |
||||||
|
"dvcDataLocation": data_path_in_dvc_repository, |
||||||
|
"dvcLoadSaveDataPath": data_path_in_worker, |
||||||
|
"dvcVersion": version, |
||||||
|
"dependence": {}, |
||||||
|
"conditionResult": {"successNode": [""], "failedNode": [""]}, |
||||||
|
"waitStartTimeout": {}, |
||||||
|
}, |
||||||
|
"flag": "YES", |
||||||
|
"taskPriority": "MEDIUM", |
||||||
|
"workerGroup": "default", |
||||||
|
"environmentCode": None, |
||||||
|
"failRetryTimes": 0, |
||||||
|
"failRetryInterval": 1, |
||||||
|
"timeoutFlag": "CLOSE", |
||||||
|
"timeoutNotifyStrategy": None, |
||||||
|
"timeout": 0, |
||||||
|
} |
||||||
|
with patch( |
||||||
|
"pydolphinscheduler.core.task.Task.gen_code_and_version", |
||||||
|
return_value=(code, version), |
||||||
|
): |
||||||
|
dvc_download = DVCDownload( |
||||||
|
name, |
||||||
|
repository=repository, |
||||||
|
data_path_in_dvc_repository=data_path_in_dvc_repository, |
||||||
|
data_path_in_worker=data_path_in_worker, |
||||||
|
version=version, |
||||||
|
) |
||||||
|
assert dvc_download.get_define() == expect |
@ -1,30 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.plugin.task.dvc; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty; |
|
||||||
|
|
||||||
public enum TaskTypeEnum { |
|
||||||
|
|
||||||
@JsonProperty("Upload") |
|
||||||
UPLOAD, |
|
||||||
@JsonProperty("Download") |
|
||||||
DOWNLOAD, |
|
||||||
@JsonProperty("Init DVC") |
|
||||||
INIT |
|
||||||
} |
|
Loading…
Reference in new issue