From cc088e0f54741c17359b33491a5eff2d2205d4cc Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Fri, 22 Jul 2022 10:47:08 +0800 Subject: [PATCH] [python] Support SageMaker task type (#11002) --- .../docs/source/tasks/index.rst | 2 + .../docs/source/tasks/sagemaker.rst | 34 ++++++ .../src/pydolphinscheduler/constants.py | 1 + .../examples/task_sagemaker_example.py | 46 ++++++++ .../src/pydolphinscheduler/tasks/sagemaker.py | 40 +++++++ .../tests/tasks/test_sagemaker.py | 101 ++++++++++++++++++ 6 files changed, 224 insertions(+) create mode 100644 dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst index d6bbb960c1..30173f838b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst @@ -40,3 +40,5 @@ In this section datax sub_process + + sagemaker diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst new file mode 100644 index 0000000000..af627a929b --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sagemaker.rst @@ -0,0 +1,34 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SageMaker +========= + + +A shell task type's example and dive into information of **PyDolphinScheduler**. + +Example +------- + +.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sagemaker_example.py + :start-after: [start workflow_declare] + :end-before: [end workflow_declare] + +Dive Into +--------- + +.. automodule:: pydolphinscheduler.tasks.sagemaker diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index a5089ac165..4544a6989d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -57,6 +57,7 @@ class TaskType(str): FLINK = "FLINK" SPARK = "SPARK" MR = "MR" + SAGEMAKER = "SAGEMAKER" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py new file mode 100644 index 0000000000..b056f61a63 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_sagemaker_example.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# [start workflow_declare] +"""A example workflow for task sagemaker.""" +import json + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.sagemaker import SageMaker + +sagemaker_request_data = { + "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1}, + "PipelineExecutionDescription": "test Pipeline", + "PipelineExecutionDisplayName": "AbalonePipeline", + "PipelineName": "AbalonePipeline", + "PipelineParameters": [ + {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"}, + {"Name": "ProcessingInstanceCount", "Value": "2"}, + ], +} + +with ProcessDefinition( + name="task_sagemaker_example", + tenant="tenant_exists", +) as pd: + task_sagemaker = SageMaker( + name="task_sagemaker", + sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2), + ) + + pd.run() +# [end workflow_declare] diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py new file mode 100644 index 0000000000..30b128d172 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sagemaker.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task SageMaker.""" + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task + + +class SageMaker(Task): + """Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler. + + :param name: A unique, meaningful string for the SageMaker task. + :param sagemaker_request_json: Request parameters of StartPipelineExecution, + see also `AWS API + `_ + + """ + + _task_custom_attr = { + "sagemaker_request_json", + } + + def __init__(self, name: str, sagemaker_request_json: str, *args, **kwargs): + super().__init__(name, TaskType.SAGEMAKER, *args, **kwargs) + self.sagemaker_request_json = sagemaker_request_json diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py new file mode 100644 index 0000000000..8838eaf497 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task SageMaker.""" +import json +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.sagemaker import SageMaker + +sagemaker_request_json = json.dumps( + { + "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1}, + "PipelineExecutionDescription": "test Pipeline", + "PipelineExecutionDisplayName": "AbalonePipeline", + "PipelineName": "AbalonePipeline", + "PipelineParameters": [ + {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"}, + {"Name": "ProcessingInstanceCount", "Value": "2"}, + ], + }, + indent=2, +) + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + {"sagemaker_request_json": sagemaker_request_json}, + { + "sagemakerRequestJson": sagemaker_request_json, + "localParams": [], + "resourceList": [], + "dependence": {}, + "waitStartTimeout": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_property_task_params(mock_code_version, attr, expect): + """Test task sagemaker task property.""" + task = SageMaker("test-sagemaker-task-params", **attr) + assert expect == task.task_params + + +def test_sagemaker_get_define(): + """Test task sagemaker function get_define.""" + code = 123 + version = 1 + name = "test_sagemaker_get_define" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "SAGEMAKER", + "taskParams": { + "resourceList": [], + "localParams": [], + "sagemakerRequestJson": sagemaker_request_json, + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + sagemaker = SageMaker(name, sagemaker_request_json) + assert sagemaker.get_define() == expect