Browse Source

[python] Support SageMaker task type (#11002)

3.1.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
cc088e0f54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
  2. 34
      dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst
  3. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  4. 46
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py
  5. 40
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py
  6. 101
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py

2
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst

@ -40,3 +40,5 @@ In this section
datax datax
sub_process sub_process
sagemaker

34
dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst

@ -0,0 +1,34 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
SageMaker
=========
A shell task type's example and dive into information of **PyDolphinScheduler**.
Example
-------
.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sagemaker_example.py
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.sagemaker

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -57,6 +57,7 @@ class TaskType(str):
FLINK = "FLINK" FLINK = "FLINK"
SPARK = "SPARK" SPARK = "SPARK"
MR = "MR" MR = "MR"
SAGEMAKER = "SAGEMAKER"
class DefaultTaskCodeNum(str): class DefaultTaskCodeNum(str):

46
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py

@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# [start workflow_declare]
"""A example workflow for task sagemaker."""
import json
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.sagemaker import SageMaker
sagemaker_request_data = {
"ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
"PipelineExecutionDescription": "test Pipeline",
"PipelineExecutionDisplayName": "AbalonePipeline",
"PipelineName": "AbalonePipeline",
"PipelineParameters": [
{"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
{"Name": "ProcessingInstanceCount", "Value": "2"},
],
}
with ProcessDefinition(
name="task_sagemaker_example",
tenant="tenant_exists",
) as pd:
task_sagemaker = SageMaker(
name="task_sagemaker",
sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
)
pd.run()
# [end workflow_declare]

40
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py

@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task SageMaker."""
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
class SageMaker(Task):
"""Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.
:param name: A unique, meaningful string for the SageMaker task.
:param sagemaker_request_json: Request parameters of StartPipelineExecution
see also `AWS API
<https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html>`_
"""
_task_custom_attr = {
"sagemaker_request_json",
}
def __init__(self, name: str, sagemaker_request_json: str, *args, **kwargs):
super().__init__(name, TaskType.SAGEMAKER, *args, **kwargs)
self.sagemaker_request_json = sagemaker_request_json

101
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py

@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task SageMaker."""
import json
from unittest.mock import patch
import pytest
from pydolphinscheduler.tasks.sagemaker import SageMaker
sagemaker_request_json = json.dumps(
{
"ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
"PipelineExecutionDescription": "test Pipeline",
"PipelineExecutionDisplayName": "AbalonePipeline",
"PipelineName": "AbalonePipeline",
"PipelineParameters": [
{"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
{"Name": "ProcessingInstanceCount", "Value": "2"},
],
},
indent=2,
)
@pytest.mark.parametrize(
"attr, expect",
[
(
{"sagemaker_request_json": sagemaker_request_json},
{
"sagemakerRequestJson": sagemaker_request_json,
"localParams": [],
"resourceList": [],
"dependence": {},
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_property_task_params(mock_code_version, attr, expect):
"""Test task sagemaker task property."""
task = SageMaker("test-sagemaker-task-params", **attr)
assert expect == task.task_params
def test_sagemaker_get_define():
"""Test task sagemaker function get_define."""
code = 123
version = 1
name = "test_sagemaker_get_define"
expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "SAGEMAKER",
"taskParams": {
"resourceList": [],
"localParams": [],
"sagemakerRequestJson": sagemaker_request_json,
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
sagemaker = SageMaker(name, sagemaker_request_json)
assert sagemaker.get_define() == expect
Loading…
Cancel
Save