diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 0fdd2a41e4..b688843812 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -181,6 +181,10 @@ export default {
title: 'Dinky',
link: '/en-us/docs/dev/user_doc/guide/task/dinky.html',
},
+ {
+ title: 'SageMaker',
+ link: '/en-us/docs/dev/user_doc/guide/task/sagemaker.html',
+ },
],
},
{
@@ -601,6 +605,10 @@ export default {
title: 'Dinky',
link: '/zh-cn/docs/dev/user_doc/guide/task/dinky.html',
},
+ {
+ title: 'SageMaker',
+ link: '/zh-cn/docs/dev/user_doc/guide/task/SageMaker.html',
+ },
],
},
{
diff --git a/docs/docs/en/guide/task/sagemaker.md b/docs/docs/en/guide/task/sagemaker.md
new file mode 100644
index 0000000000..8ee01b1b66
--- /dev/null
+++ b/docs/docs/en/guide/task/sagemaker.md
@@ -0,0 +1,64 @@
+# SageMaker Node
+
+## Overview
+
+[Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/index.html) is a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models, and then deploy them into a production-ready hosted environment.
+
+[Amazon SageMaker Model Building Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html) is a tool for building machine learning pipelines that take advantage of direct SageMaker integration.
+
+
+For users using big data and machine learning, SageMaker task plugin help users connect big data workflows with SageMaker usage scenarios.
+
+DolphinScheduler SageMaker task plugin features are as follows:
+
+- Start a SageMaker pipeline execution. Continuously get the execution status until the pipeline completes execution.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the
+ DAG editing page.
+- Drag from the toolbar
task node to canvas.
+
+## Task Example
+
+First, introduce some general parameters of DolphinScheduler:
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+ the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+ to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected,
+ randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which run the script.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm
+ email will send and the task execution will fail.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as
+ upstream of the current task.
+
+Here are some specific parameters for the SagaMaker plugin:
+
+- **SagemakerRequestJson**: Request parameters of StartPipelineExecution,see also [AWS API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+
+
+The task plugin are shown as follows:
+
+![sagemaker_pipeline](../../../../img/tasks/demo/sagemaker_pipeline.png)
+
+
+
+## Environment to prepare
+
+Some AWS configuration is required, modify a field in file `common.properties`
+```yaml
+# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.access.key.id=
+# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.secret.access.key=
+# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.region=
+```
\ No newline at end of file
diff --git a/docs/docs/zh/guide/task/sagemaker.md b/docs/docs/zh/guide/task/sagemaker.md
new file mode 100644
index 0000000000..1b0e728e82
--- /dev/null
+++ b/docs/docs/zh/guide/task/sagemaker.md
@@ -0,0 +1,57 @@
+# SageMaker 节点
+
+## 综述
+
+[Amazon SageMaker](https://aws.amazon.com/cn/pm/sagemaker) 是一个云机器学习平台。 提供了完整的基础设施,工具和工作流来帮助用户可以创建、训练和发布机器学习模型。
+
+[Amazon SageMaker Model Building Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html) 是一个可以直接使用SageMaker各种集成的机器学习管道构建工具,用户可以使用使用 Amazon SageMaker Pipeline 来构建端到端的机器学习系统。
+
+对于使用大数据与人工智能的用户,SageMaker 任务组件帮助用户可以串联起大数据工作流与SagaMaker的使用场景。
+
+DolphinScheduler SageMaker 组件的功能:
+- 启动 SageMaker Pipeline Execution,并持续获取状态,直至Pipeline执行完成。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
+- 拖动工具栏的
任务节点到画板中。
+
+
+## 任务样例
+
+首先介绍一些DS通用参数
+
+- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- **描述** :描述该节点的功能。
+- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。
+- **环境名称** :配置运行脚本的环境。
+- **失败重试次数** :任务失败重新提交的次数。
+- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。
+- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。
+- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
+- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
+
+以上参数如无特殊需求,可以默认即可
+
+- **SagemakerRequestJson**: 启动SageMakerPipeline的需要的请求参数,可见 [AWS API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+
+
+组件图示如下:
+
+![sagemaker_pipeline](../../../../img/tasks/demo/sagemaker_pipeline.png)
+
+
+
+## 环境配置
+
+需要进行AWS的一些配置,修改`common.properties`中的`xxxxx`为你的配置信息
+```yaml
+# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.access.key.id=
+# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.secret.access.key=
+# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
+resource.aws.region=
+```
\ No newline at end of file
diff --git a/docs/img/tasks/demo/sagemaker_pipeline.png b/docs/img/tasks/demo/sagemaker_pipeline.png
new file mode 100644
index 0000000000..be4eff1653
Binary files /dev/null and b/docs/img/tasks/demo/sagemaker_pipeline.png differ
diff --git a/docs/img/tasks/icons/sagemaker.png b/docs/img/tasks/icons/sagemaker.png
new file mode 100644
index 0000000000..9b8206e741
Binary files /dev/null and b/docs/img/tasks/icons/sagemaker.png differ
diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE
index 67a90d91a1..adb79ca250 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -432,6 +432,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
aws-java-sdk-s3 1.12.160 https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3/1.12.160 Apache 2.0
aws-java-sdk-core-1.12.160 https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-core/1.12.160 Apache 2.0
aws-java-sdk-kms-1.12.160 https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-kms/1.12.160 Apache 2.0
+ aws-java-sdk-sagemaker-1.12.160 https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sagemaker/1.12.160 Apache 2.0
commons-text 1.8: https://mvnrepository.com/artifact/org.apache.commons/commons-text/1.8, Apache 2.0
httpasyncclient 4.1.4: https://mvnrepository.com/artifact/org.apache.httpcomponents/httpasyncclient/4.1.4, Apache 2.0
httpcore-nio 4.4.14: https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore-nio/4.4.14, Apache 2.0
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt
new file mode 100644
index 0000000000..f49a4e16e6
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-aws-java-sdk-sagemaker.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 759b94505e..aebaa5e07f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -183,6 +183,12 @@
dolphinscheduler-task-dinky
${project.version}
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-sagemaker
+ ${project.version}
+
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
new file mode 100644
index 0000000000..66f761d825
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+
+ dolphinscheduler-task-plugin
+ org.apache.dolphinscheduler
+ dev-SNAPSHOT
+
+ 4.0.0
+
+ dolphinscheduler-task-sagemaker
+ jar
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-spi
+ provided
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-api
+ provided
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-common
+ provided
+
+
+ com.amazonaws
+ aws-java-sdk-sagemaker
+ 1.12.160
+
+
+
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
new file mode 100644
index 0000000000..b5df5b9ea5
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsRequest;
+import com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsResult;
+import com.amazonaws.services.sagemaker.model.PipelineExecutionStep;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
+
+public class PipelineUtils {
+
+
+ protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+ private final AmazonSageMaker client;
+ private String pipelineExecutionArn;
+ private String clientRequestToken;
+ private String pipelineStatus;
+
+ public PipelineUtils(AmazonSageMaker client) {
+ this.client = client;
+ }
+
+ public int startPipelineExecution(StartPipelineExecutionRequest request) {
+ int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+ try {
+ StartPipelineExecutionResult result = client.startPipelineExecution(request);
+ pipelineExecutionArn = result.getPipelineExecutionArn();
+ clientRequestToken = request.getClientRequestToken();
+ exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+ logger.info("Start pipeline: {} success", pipelineExecutionArn);
+ } catch (Exception e) {
+ logger.error("Start pipeline error: {}", e.getMessage());
+ }
+
+ return exitStatusCode;
+ }
+
+ public void stopPipelineExecution() {
+ StopPipelineExecutionRequest request = new StopPipelineExecutionRequest();
+ request.setPipelineExecutionArn(pipelineExecutionArn);
+ request.setClientRequestToken(clientRequestToken);
+
+ try {
+ StopPipelineExecutionResult result = client.stopPipelineExecution(request);
+ logger.info("Stop pipeline: {} success", result.getPipelineExecutionArn());
+ } catch (Exception e) {
+ logger.error("Stop pipeline error: {}", e.getMessage());
+ }
+
+ }
+
+ public int checkPipelineExecutionStatus() {
+ describePipelineExecution();
+ while (pipelineStatus.equals("Executing")) {
+ logger.info("check Pipeline Steps running");
+ listPipelineExecutionSteps();
+ ThreadUtils.sleep(SagemakerConstants.CHECK_PIPELINE_EXECUTION_STATUS_INTERVAL);
+ describePipelineExecution();
+ }
+
+ int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
+ if (pipelineStatus.equals("Succeeded")) {
+ exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+ }
+ logger.info("exit : {}", exitStatusCode);
+ logger.info("PipelineExecutionStatus : {}", pipelineStatus);
+ return exitStatusCode;
+ }
+
+ private void describePipelineExecution() {
+ DescribePipelineExecutionRequest request = new DescribePipelineExecutionRequest();
+ request.setPipelineExecutionArn(pipelineExecutionArn);
+ DescribePipelineExecutionResult result = client.describePipelineExecution(request);
+ pipelineStatus = result.getPipelineExecutionStatus();
+ logger.info("PipelineExecutionStatus: {}", pipelineStatus);
+ }
+
+ private void listPipelineExecutionSteps() {
+ ListPipelineExecutionStepsRequest request = new ListPipelineExecutionStepsRequest();
+ request.setPipelineExecutionArn(pipelineExecutionArn);
+ request.setMaxResults(SagemakerConstants.PIPELINE_MAX_RESULTS);
+ ListPipelineExecutionStepsResult result = client.listPipelineExecutionSteps(request);
+ List steps = result.getPipelineExecutionSteps();
+ Collections.reverse(steps);
+ logger.info("pipelineStepsStatus: ");
+ for (PipelineExecutionStep step : steps) {
+ String stepMessage = step.toString();
+ logger.info(stepMessage);
+ }
+ }
+
+ public String getPipelineExecutionArn() {
+ return pipelineExecutionArn;
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java
new file mode 100644
index 0000000000..143abbb644
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+public class SagemakerConstants {
+ public static final int CHECK_PIPELINE_EXECUTION_STATUS_INTERVAL = 5000;
+ public static final int PIPELINE_MAX_RESULTS = 100;
+
+ private SagemakerConstants() {
+ throw new IllegalStateException("Utility class");
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
new file mode 100644
index 0000000000..3b33eded1a
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString
+public class SagemakerParameters extends AbstractParameters {
+
+ /**
+ * request script
+ */
+ private String sagemakerRequestJson;
+
+ @Override
+ public boolean checkParameters() {
+ return StringUtils.isNotEmpty(sagemakerRequestJson);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
new file mode 100644
index 0000000000..1431da1e92
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import java.util.Map;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+/**
+ * SagemakerTask task, Used to start Sagemaker pipeline
+ */
+public class SagemakerTask extends AbstractTaskExecutor {
+
+ private static final ObjectMapper objectMapper =
+ new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false).configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true).configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true).setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+ /**
+ * taskExecutionContext
+ */
+ private final TaskExecutionContext taskExecutionContext;
+ /**
+ * SageMaker parameters
+ */
+ private SagemakerParameters parameters;
+ private PipelineUtils utils;
+
+ public SagemakerTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+
+ this.taskExecutionContext = taskExecutionContext;
+
+ }
+
+ @Override
+ public void init() {
+ logger.info("Sagemaker task params {}", taskExecutionContext.getTaskParams());
+
+ parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SagemakerParameters.class);
+
+ if (!parameters.checkParameters()) {
+ throw new SagemakerTaskException("Sagemaker task params is not valid");
+ }
+
+ }
+
+ @Override
+ public void handle() throws SagemakerTaskException {
+ try {
+ int exitStatusCode = handleStartPipeline();
+ setExitStatusCode(exitStatusCode);
+ } catch (Exception e) {
+ setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
+ throw new SagemakerTaskException("SageMaker task error", e);
+ }
+ }
+
+ @Override
+ public void cancelApplication(boolean cancelApplication) {
+ // stop pipeline
+ utils.stopPipelineExecution();
+ }
+
+ public int handleStartPipeline() {
+ int exitStatusCode;
+ StartPipelineExecutionRequest request = createStartPipelineRequest();
+
+ try {
+ AmazonSageMaker client = createClient();
+ utils = new PipelineUtils(client);
+ setAppIds(utils.getPipelineExecutionArn());
+ } catch (Exception e) {
+ throw new SagemakerTaskException("can not connect aws ", e);
+ }
+
+ // Start pipeline
+ exitStatusCode = utils.startPipelineExecution(request);
+ if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+ // Keep checking the health status
+ exitStatusCode = utils.checkPipelineExecutionStatus();
+ }
+ return exitStatusCode;
+ }
+
+ public StartPipelineExecutionRequest createStartPipelineRequest() throws SagemakerTaskException {
+
+ String requestJson = parameters.getSagemakerRequestJson();
+ requestJson = parseRequstJson(requestJson);
+
+ StartPipelineExecutionRequest startPipelineRequest;
+ try {
+ startPipelineRequest = objectMapper.readValue(requestJson, StartPipelineExecutionRequest.class);
+ } catch (Exception e) {
+ logger.error("can not parse SagemakerRequestJson from json: {}", requestJson);
+ throw new SagemakerTaskException("can not parse SagemakerRequestJson ", e);
+ }
+
+ logger.info("Sagemaker task create StartPipelineRequest: {}", startPipelineRequest);
+ return startPipelineRequest;
+ }
+
+ @Override
+ public SagemakerParameters getParameters() {
+ return parameters;
+ }
+
+ private String parseRequstJson(String requestJson) {
+ // combining local and global parameters
+ Map paramsMap = taskExecutionContext.getPrepareParamsMap();
+ return ParameterUtils.convertParameterPlaceholders(requestJson, ParamUtils.convert(paramsMap));
+ }
+
+ private AmazonSageMaker createClient() {
+ final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+ final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+ final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+ final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+ final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+ // create a SageMaker client
+ return AmazonSageMakerClientBuilder.standard()
+ .withCredentials(awsCredentialsProvider)
+ .withRegion(awsRegion)
+ .build();
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
new file mode 100644
index 0000000000..9e88471cb6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class SagemakerTaskChannel implements TaskChannel {
+
+ @Override
+ public void cancelApplication(boolean status) {
+
+ }
+
+ @Override
+ public SagemakerTask createTask(TaskExecutionContext taskRequest) {
+ return new SagemakerTask(taskRequest);
+ }
+
+ @Override
+ public AbstractParameters parseParameters(ParametersNode parametersNode) {
+ return JSONUtils.parseObject(parametersNode.getTaskParams(), SagemakerParameters.class);
+ }
+
+ @Override
+ public ResourceParametersHelper getResources(String parameters) {
+ return null;
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java
new file mode 100644
index 0000000000..0c4afec462
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannelFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class SagemakerTaskChannelFactory implements TaskChannelFactory {
+ @Override
+ public TaskChannel create() {
+ return new SagemakerTaskChannel();
+ }
+
+ @Override
+ public String getName() {
+ return "SAGEMAKER";
+ }
+
+ @Override
+ public List getParams() {
+ return Collections.emptyList();
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java
new file mode 100644
index 0000000000..92beb5c5c2
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+/**
+ * Custom SagemakerTaskException
+ */
+public class SagemakerTaskException extends RuntimeException {
+
+ public SagemakerTaskException() {
+ super();
+ }
+
+ public SagemakerTaskException(String message) {
+ super(message);
+ }
+
+ public SagemakerTaskException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
new file mode 100644
index 0000000000..a7dcdca7bd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.sagemaker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.sagemaker.AmazonSageMaker;
+import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.ListPipelineExecutionStepsResult;
+import com.amazonaws.services.sagemaker.model.PipelineExecutionStep;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
+import com.amazonaws.services.sagemaker.model.StartPipelineExecutionResult;
+import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JSONUtils.class, PropertyUtils.class,})
+@PowerMockIgnore({"javax.*"})
+@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
+public class SagemakerTaskTest {
+
+ private final String pipelineExecutionArn = "test-pipeline-arn";
+ private SagemakerTask sagemakerTask;
+ private AmazonSageMaker client;
+ private PipelineUtils pipelineUtils;
+
+ @Before
+ public void before() {
+ PowerMockito.mockStatic(PropertyUtils.class);
+ String parameters = buildParameters();
+ TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+ Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ sagemakerTask = new SagemakerTask(taskExecutionContext);
+ sagemakerTask.init();
+ client = mock(AmazonSageMaker.class);
+ pipelineUtils = new PipelineUtils(client);
+
+ StartPipelineExecutionResult startPipelineExecutionResult = mock(StartPipelineExecutionResult.class);
+ when(startPipelineExecutionResult.getPipelineExecutionArn()).thenReturn(pipelineExecutionArn);
+
+ StopPipelineExecutionResult stopPipelineExecutionResult = mock(StopPipelineExecutionResult.class);
+ when(stopPipelineExecutionResult.getPipelineExecutionArn()).thenReturn(pipelineExecutionArn);
+
+ DescribePipelineExecutionResult describePipelineExecutionResult = mock(DescribePipelineExecutionResult.class);
+ when(describePipelineExecutionResult.getPipelineExecutionStatus()).thenReturn("Executing", "Succeeded");
+
+ ListPipelineExecutionStepsResult listPipelineExecutionStepsResult = mock(ListPipelineExecutionStepsResult.class);
+ PipelineExecutionStep pipelineExecutionStep = mock(PipelineExecutionStep.class);
+ List pipelineExecutionSteps = new ArrayList<>();
+ pipelineExecutionSteps.add(pipelineExecutionStep);
+ pipelineExecutionSteps.add(pipelineExecutionStep);
+
+ when(pipelineExecutionStep.toString()).thenReturn("Test Step1", "Test Step2");
+ when(listPipelineExecutionStepsResult.getPipelineExecutionSteps()).thenReturn(pipelineExecutionSteps);
+
+ when(client.startPipelineExecution(any())).thenReturn(startPipelineExecutionResult);
+ when(client.stopPipelineExecution(any())).thenReturn(stopPipelineExecutionResult);
+ when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult);
+ when(client.listPipelineExecutionSteps(any())).thenReturn(listPipelineExecutionStepsResult);
+
+ }
+
+ @Test
+ public void testStartPipelineRequest() throws Exception {
+ StartPipelineExecutionRequest request = sagemakerTask.createStartPipelineRequest();
+ Assert.assertEquals("AbalonePipeline", request.getPipelineName());
+ Assert.assertEquals("test Pipeline", request.getPipelineExecutionDescription());
+ Assert.assertEquals("AbalonePipeline", request.getPipelineExecutionDisplayName());
+ Assert.assertEquals("AbalonePipeline", request.getPipelineName());
+ Assert.assertEquals(new Integer(1), request.getParallelismConfiguration().getMaxParallelExecutionSteps());
+ }
+
+ @Test
+ public void testPipelineExecution() throws Exception {
+ pipelineUtils.startPipelineExecution(sagemakerTask.createStartPipelineRequest());
+ Assert.assertEquals(pipelineExecutionArn, pipelineUtils.getPipelineExecutionArn());
+ Assert.assertEquals(0, pipelineUtils.checkPipelineExecutionStatus());
+ pipelineUtils.stopPipelineExecution();
+ }
+
+ private String buildParameters() {
+ SagemakerParameters parameters = new SagemakerParameters();
+ String sagemakerRequestJson;
+ try (InputStream i = this.getClass().getResourceAsStream("SagemakerRequestJson.json")) {
+ assert i != null;
+ sagemakerRequestJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ parameters.setSagemakerRequestJson(sagemakerRequestJson);
+
+ return JSONUtils.toJsonString(parameters);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json
new file mode 100644
index 0000000000..9a1a28bfa1
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/resources/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerRequestJson.json
@@ -0,0 +1,9 @@
+{
+ "ParallelismConfiguration": {
+ "MaxParallelExecutionSteps": 1
+ },
+ "PipelineExecutionDescription": "test Pipeline",
+ "PipelineExecutionDisplayName": "AbalonePipeline",
+ "PipelineName": "AbalonePipeline",
+ "PipelineParameters": [ ]
+}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index f934cc5c83..a9b094b443 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -57,6 +57,7 @@
dolphinscheduler-task-openmldb
dolphinscheduler-task-dvc
dolphinscheduler-task-dinky
+ dolphinscheduler-task-sagemaker
diff --git a/dolphinscheduler-ui/public/images/task-icons/sagemaker.png b/dolphinscheduler-ui/public/images/task-icons/sagemaker.png
new file mode 100644
index 0000000000..9b8206e741
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/sagemaker.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png b/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png
new file mode 100644
index 0000000000..270e9fe563
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/sagemaker_hover.png differ
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 3054129214..c0f2f25046 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -72,3 +72,4 @@ export { useMlflowModels } from './use-mlflow-models'
export { useOpenmldb } from './use-openmldb'
export { useDvc } from './use-dvc'
export { useDinky } from './use-dinky'
+export { useSagemaker } from './use-sagemaker'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts
new file mode 100644
index 0000000000..83e52b874e
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sagemaker.ts
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import type { IJsonItem } from '../types'
+import { useCustomParams } from '.'
+
+export function useSagemaker(model: { [field: string]: any }): IJsonItem[] {
+
+ return [
+ {
+ type: 'editor',
+ field: 'sagemakerRequestJson',
+ name: "SagemakerRequestJson",
+ props: {
+ language: 'json'
+ },
+ validate: {
+ trigger: ['input', 'trigger'],
+ required: true,
+ message: 'requestJson'
+ }
+ },
+ ...useCustomParams({ model, field: 'localParams', isSimple: false })
+ ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 7a3878bc58..1d719a8e37 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -384,6 +384,10 @@ export function formatParams(data: INodeData): {
taskParams.dvcStoreUrl = data.dvcStoreUrl
}
+ if (data.taskType === 'SAGEMAKER') {
+ taskParams.sagemakerRequestJson = data.sagemakerRequestJson
+ }
+
if (data.taskType === 'DINKY') {
taskParams.address = data.address
taskParams.taskId = data.taskId
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 4240892eff..1d16aeef03 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -40,6 +40,7 @@ import { useMlflow } from './use-mlflow'
import { useOpenmldb } from './use-openmldb'
import { useDvc } from './use-dvc'
import { useDinky } from './use-dinky'
+import { userSagemaker } from './use-sagemaker'
export default {
SHELL: useShell,
@@ -66,5 +67,6 @@ export default {
MLFLOW: useMlflow,
OPENMLDB: useOpenmldb,
DVC: useDvc,
- DINKY: useDinky
+ DINKY: useDinky,
+ SAGEMAKER: userSagemaker
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
new file mode 100644
index 0000000000..fc3a3be0a6
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData, ITaskData } from '../types'
+
+export function userSagemaker({
+ projectCode,
+ from = 0,
+ readonly,
+ data
+}: {
+ projectCode: number
+ from?: number
+ readonly?: boolean
+ data?: ITaskData
+}) {
+ const model = reactive({
+ name: '',
+ taskType: 'MLFLOW',
+ flag: 'YES',
+ description: '',
+ timeoutFlag: false,
+ localParams: [],
+ environmentCode: null,
+ failRetryInterval: 1,
+ failRetryTimes: 0,
+ workerGroup: 'default',
+ delayTime: 0,
+ timeout: 30,
+ timeoutNotifyStrategy: ['WARN'],
+ } as INodeData)
+
+ let extra: IJsonItem[] = []
+ if (from === 1) {
+ extra = [
+ Fields.useTaskType(model, readonly),
+ Fields.useProcessName({
+ model,
+ projectCode,
+ isCreate: !data?.id,
+ from,
+ processName: data?.processName
+ })
+ ]
+ }
+
+ return {
+ json: [
+ Fields.useName(from),
+ ...extra,
+ Fields.useRunFlag(),
+ Fields.useDescription(),
+ Fields.useTaskPriority(),
+ Fields.useWorkerGroup(),
+ Fields.useEnvironmentName(model, !model.id),
+ ...Fields.useTaskGroup(model, projectCode),
+ ...Fields.useFailed(),
+ Fields.useDelayTime(model),
+ ...Fields.useTimeoutAlarm(model),
+ ...Fields.useSagemaker(model),
+ Fields.usePreTasks()
+ ] as IJsonItem[],
+ model
+ }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 268d8635b8..9878258338 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -359,6 +359,7 @@ interface ITaskParams {
address?: string
taskId?: string
online?: boolean
+ sagemakerRequestJson?: string
}
interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 1cc0e00c5b..b441c6229c 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -40,6 +40,7 @@ export type TaskType =
| 'OPENMLDB'
| 'DVC'
| 'DINKY'
+ | 'SAGEMAKER'
export const TASK_TYPES_MAP = {
SHELL: {
@@ -128,5 +129,9 @@ export const TASK_TYPES_MAP = {
DINKY: {
alias: 'DINKY',
helperLinkDisable: true
+ },
+ SAGEMAKER: {
+ alias: 'SageMaker',
+ helperLinkDisable: true
}
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 90615483fd..c22dec8480 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -176,6 +176,9 @@ $bgLight: #ffffff;
&.icon-dinky {
background-image: url('/images/task-icons/dinky.png');
}
+ &.icon-sagemaker {
+ background-image: url('/images/task-icons/sagemaker.png');
+ }
}
&:hover {
@@ -255,6 +258,9 @@ $bgLight: #ffffff;
&.icon-dinky {
background-image: url('/images/task-icons/dinky_hover.png');
}
+ &.icon-sagemaker {
+ background-image: url('/images/task-icons/sagemaker_hover.png');
+ }
}
}
}
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 562e55b20b..49c6d44c7f 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -255,6 +255,7 @@ aws-java-sdk-s3-1.12.160.jar
aws-java-sdk-kms-1.12.160.jar
aws-java-sdk-emr-1.12.160.jar
aws-java-sdk-core-1.12.160.jar
+aws-java-sdk-sagemaker-1.12.160.jar
commons-text-1.8.jar
httpasyncclient-4.1.4.jar
httpcore-nio-4.4.14.jar