Browse Source

[DSIP-19] Support sagemaker connections in the connection center, as well as external connections to the connection center in sagemaker tasks (#14976)

* Refactoring `Sagemaker` task plugin with connections managed in connection center.

---------

Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
3.2.1-prepare
chenrj 1 year ago committed by GitHub
parent
commit
5a3827eef4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-bom/pom.xml
  2. 5
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
  3. 50
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml
  4. 65
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java
  5. 37
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java
  6. 38
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java
  7. 35
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java
  8. 34
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java
  9. 136
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java
  10. 115
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java
  11. 1
      dolphinscheduler-datasource-plugin/pom.xml
  12. 4
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
  13. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
  14. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml
  15. 28
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java
  16. 33
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  17. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java
  18. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java
  19. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
  20. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
  21. 12
      dolphinscheduler-ui/src/locales/en_US/datasource.ts
  22. 10
      dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
  23. 2
      dolphinscheduler-ui/src/service/modules/data-source/types.ts
  24. 9
      dolphinscheduler-ui/src/views/datasource/list/detail.tsx
  25. 24
      dolphinscheduler-ui/src/views/datasource/list/use-form.ts
  26. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
  27. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  28. 8
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
  29. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

1
dolphinscheduler-bom/pom.xml

@ -120,6 +120,7 @@
<zeppelin-client.version>0.10.1</zeppelin-client.version> <zeppelin-client.version>0.10.1</zeppelin-client.version>
<testcontainer.version>1.17.6</testcontainer.version> <testcontainer.version>1.17.6</testcontainer.version>
<checker-qual.version>3.19.0</checker-qual.version> <checker-qual.version>3.19.0</checker-qual.version>
<zeppelin-client.version>0.10.1</zeppelin-client.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>

5
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml

@ -143,5 +143,10 @@
<artifactId>dolphinscheduler-datasource-doris</artifactId> <artifactId>dolphinscheduler-datasource-doris</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-sagemaker</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

50
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-datasource-sagemaker</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sagemaker</artifactId>
</dependency>
</dependencies>
</project>

65
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
import static com.google.common.base.Preconditions.checkNotNull;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sagemaker.AmazonSageMaker;
import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
import com.amazonaws.services.sagemaker.model.ListNotebookInstancesRequest;
@Slf4j
public class SagemakerClientWrapper implements AutoCloseable {
private AmazonSageMaker amazonSageMaker;
public SagemakerClientWrapper(String accessKey, String secretAccessKey, String region) {
checkNotNull(accessKey, "sagemaker accessKey cannot be null");
checkNotNull(secretAccessKey, "sagemaker secretAccessKey cannot be null");
checkNotNull(region, "sagemaker region cannot be null");
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create a SageMaker client
amazonSageMaker = AmazonSageMakerClientBuilder.standard().withCredentials(awsCredentialsProvider)
.withRegion(region).build();
}
public boolean checkConnect() {
try {
// If listing notebook instances fails, an exception will be thrown directly
ListNotebookInstancesRequest request = new ListNotebookInstancesRequest();
amazonSageMaker.listNotebookInstances(request);
log.info("sagemaker client connects to server successfully");
return true;
} catch (Exception e) {
log.info("sagemaker client failed to connect to the server");
}
return false;
}
@Override
public void close() {
}
}

37
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
public class SagemakerDataSourceChannel implements DataSourceChannel {
@Override
public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
throw new UnsupportedOperationException("Sagemaker AdHocDataSourceClient is not supported");
}
@Override
public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
throw new UnsupportedOperationException("Sagemaker AdHocDataSourceClient is not supported");
}
}

38
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import com.google.auto.service.AutoService;
@AutoService(DataSourceChannelFactory.class)
public class SagemakerDataSourceChannelFactory implements DataSourceChannelFactory {
@Override
public DataSourceChannel create() {
return new SagemakerDataSourceChannel();
}
@Override
public String getName() {
return "sagemaker";
}
}

35
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonInclude;
@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SagemakerConnectionParam implements ConnectionParam {
protected String userName;
protected String password;
protected String awsRegion;
}

34
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.spi.enums.DbType;
import lombok.Data;
@Data
public class SagemakerDataSourceParamDTO extends BaseDataSourceParamDTO {
protected String awsRegion;
@Override
public DbType getType() {
return DbType.SAGEMAKER;
}
}

136
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.SagemakerClientWrapper;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.text.MessageFormat;
import lombok.extern.slf4j.Slf4j;
import com.google.auto.service.AutoService;
@AutoService(DataSourceProcessor.class)
@Slf4j
public class SagemakerDataSourceProcessor implements DataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, SagemakerDataSourceParamDTO.class);
}
@Override
public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParamDTO) {
SagemakerDataSourceParamDTO sageMakerDataSourceParamDTO = (SagemakerDataSourceParamDTO) datasourceParamDTO;
if (StringUtils.isEmpty(sageMakerDataSourceParamDTO.getUserName())
|| StringUtils.isEmpty(sageMakerDataSourceParamDTO.getPassword())
|| StringUtils.isEmpty(sageMakerDataSourceParamDTO.getAwsRegion())) {
throw new IllegalArgumentException("sagemaker datasource param is not valid");
}
}
@Override
public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
SagemakerConnectionParam baseConnectionParam = (SagemakerConnectionParam) connectionParam;
return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(),
PasswordUtils.encodePassword(baseConnectionParam.getUserName()),
PasswordUtils.encodePassword(baseConnectionParam.getPassword()),
PasswordUtils.encodePassword(baseConnectionParam.getAwsRegion()));
}
// SageMaker
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
SagemakerConnectionParam connectionParams = (SagemakerConnectionParam) createConnectionParams(connectionJson);
SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new SagemakerDataSourceParamDTO();
sagemakerDataSourceParamDTO.setUserName(connectionParams.getUserName());
sagemakerDataSourceParamDTO.setPassword(connectionParams.getPassword());
sagemakerDataSourceParamDTO.setAwsRegion(connectionParams.getAwsRegion());
return sagemakerDataSourceParamDTO;
}
@Override
public SagemakerConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
SagemakerDataSourceParamDTO sageMakerDataSourceParam = (SagemakerDataSourceParamDTO) datasourceParam;
SagemakerConnectionParam sageMakerConnectionParam = new SagemakerConnectionParam();
sageMakerConnectionParam.setUserName(sageMakerDataSourceParam.getUserName());
sageMakerConnectionParam.setPassword(sageMakerDataSourceParam.getPassword());
sageMakerConnectionParam.setAwsRegion(sageMakerDataSourceParam.getAwsRegion());
return sageMakerConnectionParam;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return JSONUtils.parseObject(connectionJson, SagemakerConnectionParam.class);
}
@Override
public String getDatasourceDriver() {
return "";
}
@Override
public String getValidationQuery() {
return "";
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
return "";
}
@Override
public Connection getConnection(ConnectionParam connectionParam) {
return null;
}
@Override
public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) {
SagemakerConnectionParam baseConnectionParam = (SagemakerConnectionParam) connectionParam;
try (
SagemakerClientWrapper sagemakerClientWrapper =
new SagemakerClientWrapper(baseConnectionParam.userName,
baseConnectionParam.password, baseConnectionParam.awsRegion)) {
return sagemakerClientWrapper.checkConnect();
} catch (Exception e) {
log.error("sagemaker client failed to connect to the server", e);
return false;
}
}
@Override
public DbType getDbType() {
return DbType.SAGEMAKER;
}
@Override
public DataSourceProcessor create() {
return new SagemakerDataSourceProcessor();
}
}

115
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.sagemaker;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceProcessor;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class SagemakerDataSourceProcessorTest {
private SagemakerDataSourceProcessor sagemakerDataSourceProcessor;
private String connectJson =
"{\"userName\":\"access key\",\"password\":\"secret access key\",\"awsRegion\":\"region\"}";
@BeforeEach
public void init() {
sagemakerDataSourceProcessor = new SagemakerDataSourceProcessor();
}
@Test
void testCheckDatasourceParam() {
SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new SagemakerDataSourceParamDTO();
Assertions.assertThrows(IllegalArgumentException.class,
() -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
sagemakerDataSourceParamDTO.setUserName("access key");
Assertions.assertThrows(IllegalArgumentException.class,
() -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
sagemakerDataSourceParamDTO.setPassword("secret access key");
Assertions.assertThrows(IllegalArgumentException.class,
() -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
sagemakerDataSourceParamDTO.setAwsRegion("region");
Assertions
.assertDoesNotThrow(
() -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO));
}
@Test
void testGetDatasourceUniqueId() {
SagemakerConnectionParam sagemakerConnectionParam = new SagemakerConnectionParam();
sagemakerConnectionParam.setUserName("access key");
sagemakerConnectionParam.setPassword("secret access key");
sagemakerConnectionParam.setAwsRegion("region");
Assertions.assertEquals("sagemaker@access key@secret access key@region",
sagemakerDataSourceProcessor.getDatasourceUniqueId(sagemakerConnectionParam, DbType.SAGEMAKER));
}
@Test
void testCreateDatasourceParamDTO() {
SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
(SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
Assertions.assertEquals("access key", sagemakerDataSourceParamDTO.getUserName());
Assertions.assertEquals("secret access key", sagemakerDataSourceParamDTO.getPassword());
Assertions.assertEquals("region", sagemakerDataSourceParamDTO.getAwsRegion());
}
@Test
void testCreateConnectionParams() {
SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
(SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
SagemakerConnectionParam sagemakerConnectionParam =
sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO);
Assertions.assertEquals("access key", sagemakerConnectionParam.getUserName());
Assertions.assertEquals("secret access key", sagemakerConnectionParam.getPassword());
Assertions.assertEquals("region", sagemakerConnectionParam.getAwsRegion());
}
@Test
void testTestConnection() {
SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO =
(SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson);
SagemakerConnectionParam connectionParam =
sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO);
Assertions.assertFalse(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
try (
MockedConstruction<SagemakerClientWrapper> sshClientWrapperMockedConstruction =
Mockito.mockConstruction(SagemakerClientWrapper.class, (mock, context) -> {
Mockito.when(
mock.checkConnect())
.thenReturn(true);
})) {
Assertions.assertTrue(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
}
}
}

1
dolphinscheduler-datasource-plugin/pom.xml

@ -53,6 +53,7 @@
<module>dolphinscheduler-datasource-snowflake</module> <module>dolphinscheduler-datasource-snowflake</module>
<module>dolphinscheduler-datasource-vertica</module> <module>dolphinscheduler-datasource-vertica</module>
<module>dolphinscheduler-datasource-doris</module> <module>dolphinscheduler-datasource-doris</module>
<module>dolphinscheduler-datasource-sagemaker</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>

4
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java

@ -52,8 +52,8 @@ public enum DbType {
VERTICA(21, "vertica"), VERTICA(21, "vertica"),
HANA(22, "hana"), HANA(22, "hana"),
DORIS(23, "doris"), DORIS(23, "doris"),
ZEPPELIN(24, "zeppelin"); ZEPPELIN(24, "zeppelin"),
SAGEMAKER(25, "sagemaker");
private static final Map<Integer, DbType> DB_TYPE_MAP = private static final Map<Integer, DbType> DB_TYPE_MAP =
Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity())); Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity()));
@EnumValue @EnumValue

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml

@ -293,5 +293,9 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId> <artifactId>commons-collections4</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

5
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml

@ -47,6 +47,11 @@
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sagemaker</artifactId> <artifactId>aws-java-sdk-sagemaker</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

28
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java

@ -17,27 +17,55 @@
package org.apache.dolphinscheduler.plugin.task.sagemaker; package org.apache.dolphinscheduler.plugin.task.sagemaker;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Getter @Getter
@Setter @Setter
@ToString @ToString
@Slf4j
public class SagemakerParameters extends AbstractParameters { public class SagemakerParameters extends AbstractParameters {
/** /**
* request script * request script
*/ */
private String sagemakerRequestJson; private String sagemakerRequestJson;
private String username;
private String password;
private String awsRegion;
private int datasource;
private String type;
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return StringUtils.isNotEmpty(sagemakerRequestJson); return StringUtils.isNotEmpty(sagemakerRequestJson);
} }
public SagemakerTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) {
DataSourceParameters dataSourceParameters =
(DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
SagemakerTaskExecutionContext sagemakerTaskExecutionContext = new SagemakerTaskExecutionContext();
sagemakerTaskExecutionContext.setConnectionParams(
Objects.nonNull(dataSourceParameters) ? dataSourceParameters.getConnectionParams() : null);
return sagemakerTaskExecutionContext;
}
@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();
resources.put(ResourceType.DATASOURCE, datasource);
return resources;
}
} }

33
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -23,13 +23,15 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -64,14 +66,16 @@ public class SagemakerTask extends AbstractRemoteTask {
*/ */
private SagemakerParameters parameters; private SagemakerParameters parameters;
private final AmazonSageMaker client; private AmazonSageMaker client;
private final PipelineUtils utils; private PipelineUtils utils;
private PipelineUtils.PipelineId pipelineId; private PipelineUtils.PipelineId pipelineId;
private SagemakerConnectionParam sagemakerConnectionParam;
private SagemakerTaskExecutionContext sagemakerTaskExecutionContext;
private TaskExecutionContext taskExecutionContext;
public SagemakerTask(TaskExecutionContext taskExecutionContext) { public SagemakerTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext); super(taskExecutionContext);
client = createClient(); this.taskExecutionContext = taskExecutionContext;
utils = new PipelineUtils();
} }
@Override @Override
@ -83,15 +87,24 @@ public class SagemakerTask extends AbstractRemoteTask {
public void init() { public void init() {
parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class); parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class);
log.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters));
if (parameters == null) { if (parameters == null) {
throw new SagemakerTaskException("Sagemaker task params is empty"); throw new SagemakerTaskException("Sagemaker task params is empty");
} }
if (!parameters.checkParameters()) { if (!parameters.checkParameters()) {
throw new SagemakerTaskException("Sagemaker task params is not valid"); throw new SagemakerTaskException("Sagemaker task params is not valid");
} }
sagemakerTaskExecutionContext =
parameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
sagemakerConnectionParam =
(SagemakerConnectionParam) DataSourceUtils.buildConnectionParams(DbType.valueOf(parameters.getType()),
sagemakerTaskExecutionContext.getConnectionParams());
parameters.setUsername(sagemakerConnectionParam.getUserName());
parameters.setPassword(sagemakerConnectionParam.getPassword());
parameters.setAwsRegion(sagemakerConnectionParam.getAwsRegion());
log.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters));
client = createClient();
utils = new PipelineUtils();
} }
@Override @Override
@ -170,9 +183,9 @@ public class SagemakerTask extends AbstractRemoteTask {
} }
protected AmazonSageMaker createClient() { protected AmazonSageMaker createClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID); final String awsAccessKeyId = parameters.getUsername();
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY); final String awsSecretAccessKey = parameters.getPassword();
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION); final String awsRegion = parameters.getAwsRegion();
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey); final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials); final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create a SageMaker client // create a SageMaker client

2
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java

@ -43,7 +43,7 @@ public class SagemakerTaskChannel implements TaskChannel {
@Override @Override
public ResourceParametersHelper getResources(String parameters) { public ResourceParametersHelper getResources(String parameters) {
return null; return JSONUtils.parseObject(parameters, SagemakerParameters.class).getResources();
} }
} }

45
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sagemaker;
import java.io.Serializable;
/**
* master/worker task transport
*/
public class SagemakerTaskExecutionContext implements Serializable {
/**
* connectionParams
*/
private String connectionParams;
public String getConnectionParams() {
return connectionParams;
}
public void setConnectionParams(String connectionParams) {
this.connectionParams = connectionParams;
}
@Override
public String toString() {
return "SagemakerTaskExecutionContext{" + "connectionParams='" + connectionParams + '\'' + '}';
}
}

34
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java

@ -18,19 +18,26 @@
package org.apache.dolphinscheduler.plugin.task.sagemaker; package org.apache.dolphinscheduler.plugin.task.sagemaker;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -49,14 +56,30 @@ public class SagemakerTaskTest {
private AmazonSageMaker client; private AmazonSageMaker client;
private PipelineUtils pipelineUtils = new PipelineUtils(); private PipelineUtils pipelineUtils = new PipelineUtils();
private static final String MOCK_USERNAME = "lucky";
private static final String MOCK_PASSWORD = "root";
private static final String MOCK_TYPE = "SAGEMAKER";
private static final String MOCK_AWS_REGION = "REGION";
private static MockedStatic<DataSourceUtils> dataSourceUtilsStaticMock = null;
@BeforeEach @BeforeEach
public void before() { public void before() {
String parameters = buildParameters(); String parameters = buildParameters();
System.out.println(parameters);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
ResourceParametersHelper resourceParametersHelper = Mockito.mock(ResourceParametersHelper.class);
SagemakerConnectionParam sagemakerConnectionParam = Mockito.mock(SagemakerConnectionParam.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
Mockito.when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class);
dataSourceUtilsStaticMock.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
.thenReturn(sagemakerConnectionParam);
client = Mockito.mock(AmazonSageMaker.class); client = Mockito.mock(AmazonSageMaker.class);
sagemakerTask = new SagemakerTask(taskExecutionContext); sagemakerTask = spy(new SagemakerTask(taskExecutionContext));
doReturn(client).when(sagemakerTask).createClient();
sagemakerTask.init(); sagemakerTask.init();
StartPipelineExecutionResult startPipelineExecutionResult = Mockito.mock(StartPipelineExecutionResult.class); StartPipelineExecutionResult startPipelineExecutionResult = Mockito.mock(StartPipelineExecutionResult.class);
@ -75,6 +98,11 @@ public class SagemakerTaskTest {
Mockito.lenient().when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult); Mockito.lenient().when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult);
} }
@AfterEach
public void afterEach() {
dataSourceUtilsStaticMock.close();
}
@Test @Test
public void testStartPipelineRequest() throws Exception { public void testStartPipelineRequest() throws Exception {
StartPipelineExecutionRequest request = sagemakerTask.createStartPipelineRequest(); StartPipelineExecutionRequest request = sagemakerTask.createStartPipelineRequest();
@ -105,6 +133,10 @@ public class SagemakerTaskTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
parameters.setSagemakerRequestJson(sagemakerRequestJson); parameters.setSagemakerRequestJson(sagemakerRequestJson);
parameters.setUsername(MOCK_USERNAME);
parameters.setPassword(MOCK_PASSWORD);
parameters.setAwsRegion(MOCK_AWS_REGION);
parameters.setType(MOCK_TYPE);
return JSONUtils.toJsonString(parameters); return JSONUtils.toJsonString(parameters);
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java

@ -32,10 +32,12 @@ import java.util.Objects;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Getter @Getter
@Setter @Setter
@ToString @ToString
@Slf4j
public class ZeppelinParameters extends AbstractParameters { public class ZeppelinParameters extends AbstractParameters {
/** /**

12
dolphinscheduler-ui/src/locales/en_US/datasource.ts

@ -70,6 +70,10 @@ export default {
user_password_tips: 'Please enter your password', user_password_tips: 'Please enter your password',
aws_region: 'Aws Region', aws_region: 'Aws Region',
aws_region_tips: 'Please enter AwsRegion', aws_region_tips: 'Please enter AwsRegion',
aws_access_key: 'AwsAccessKey',
aws_access_key_tips: 'Please enter AwsAccessKey',
aws_secret_access_key: 'AwsSecretAccessKey',
aws_secret_access_key_tips: 'Please enter AwsSecretAccessKey',
validation: 'Validation', validation: 'Validation',
mode_tips: 'Please select a mode', mode_tips: 'Please select a mode',
jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format', jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format',
@ -87,5 +91,11 @@ export default {
SecretAccessKey: 'SecretAccessKey', SecretAccessKey: 'SecretAccessKey',
SecretAccessKey_tips: 'Please input SecretAccessKey', SecretAccessKey_tips: 'Please input SecretAccessKey',
dbUser: 'DbUser', dbUser: 'DbUser',
dbUser_tips: 'Please input DbUser' dbUser_tips: 'Please input DbUser',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: 'Please input zeppelin restEndpoint',
kubeConfig: 'kubeConfig',
kubeConfig_tips: 'Please input KubeConfig',
namespace: 'namespace',
namespace_tips: 'Please input namespace'
} }

10
dolphinscheduler-ui/src/locales/zh_CN/datasource.ts

@ -67,6 +67,10 @@ export default {
user_password_tips: '请输入密码', user_password_tips: '请输入密码',
aws_region: 'AwsRegion', aws_region: 'AwsRegion',
aws_region_tips: '请输入AwsRegion', aws_region_tips: '请输入AwsRegion',
aws_access_key: 'AwsAccessKey',
aws_access_key_tips: '请输入AwsAccessKey',
aws_secret_access_key: 'AwsSecretAccessKey',
aws_secret_access_key_tips: '请输入AwsSecretAccessKey',
validation: '验证', validation: '验证',
mode_tips: '请选择验证模式', mode_tips: '请选择验证模式',
jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式', jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式',
@ -86,5 +90,9 @@ export default {
dbUser: 'DbUser', dbUser: 'DbUser',
dbUser_tips: '请输入DbUser', dbUser_tips: '请输入DbUser',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint', zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint' zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint',
kubeConfig: 'kubeConfig',
kubeConfig_tips: '请输入KubeConfig',
namespace: 'namespace',
namespace_tips: '请输入namespace'
} }

2
dolphinscheduler-ui/src/service/modules/data-source/types.ts

@ -40,6 +40,7 @@ type IDataBase =
| 'DORIS' | 'DORIS'
| 'KYUUBI' | 'KYUUBI'
| 'ZEPPELIN' | 'ZEPPELIN'
| 'SAGEMAKER'
type IDataBaseLabel = type IDataBaseLabel =
| 'MYSQL' | 'MYSQL'
@ -61,6 +62,7 @@ type IDataBaseLabel =
| 'SSH' | 'SSH'
| 'KYUUBI' | 'KYUUBI'
| 'ZEPPELIN' | 'ZEPPELIN'
| 'SAGEMAKER'
interface IDataSource { interface IDataSource {
id?: number id?: number

9
dolphinscheduler-ui/src/views/datasource/list/detail.tsx

@ -165,6 +165,7 @@ const DetailModal = defineComponent({
showPublicKey, showPublicKey,
modeOptions, modeOptions,
redShiftModeOptions, redShiftModeOptions,
sagemakerModeOption,
loading, loading,
saving, saving,
testing, testing,
@ -321,6 +322,8 @@ const DetailModal = defineComponent({
options={ options={
detailForm.type === 'REDSHIFT' detailForm.type === 'REDSHIFT'
? redShiftModeOptions ? redShiftModeOptions
: detailForm.type === 'SAGEMAKER'
? sagemakerModeOption
: modeOptions : modeOptions
} }
></NSelect> ></NSelect>
@ -497,7 +500,11 @@ const DetailModal = defineComponent({
/> />
</NFormItem> </NFormItem>
<NFormItem <NFormItem
v-show={showMode && detailForm.mode === 'IAM-accessKey'} v-show={
showMode &&
detailForm.mode === 'IAM-accessKey' &&
detailForm.type != 'SAGEMAKER'
}
label={t('datasource.dbUser')} label={t('datasource.dbUser')}
path='dbUser' path='dbUser'
show-require-mark show-require-mark

24
dolphinscheduler-ui/src/views/datasource/list/use-form.ts

@ -182,7 +182,8 @@ export function useForm(id?: number) {
if ( if (
!state.detailForm.dbUser && !state.detailForm.dbUser &&
state.showMode && state.showMode &&
state.detailForm.mode === 'IAM-accessKey' state.detailForm.mode === 'IAM-accessKey' &&
state.detailForm.type != 'SAGEMAKER'
) { ) {
return new Error(t('datasource.IAM-accessKey')) return new Error(t('datasource.IAM-accessKey'))
} }
@ -228,6 +229,12 @@ export function useForm(id?: number) {
label: 'IAM-accessKey', label: 'IAM-accessKey',
value: 'IAM-accessKey' value: 'IAM-accessKey'
} }
],
sagemakerModeOption: [
{
label: 'IAM-accessKey',
value: 'IAM-accessKey'
}
] ]
}) })
@ -239,8 +246,8 @@ export function useForm(id?: number) {
state.showHost = type !== 'ATHENA' state.showHost = type !== 'ATHENA'
state.showPort = type !== 'ATHENA' state.showPort = type !== 'ATHENA'
state.showAwsRegion = type === 'ATHENA' state.showAwsRegion = type === 'ATHENA' || type === 'SAGEMAKER'
state.showMode = ['AZURESQL', 'REDSHIFT'].includes(type) state.showMode = ['AZURESQL', 'REDSHIFT', 'SAGEMAKER'].includes(type)
if (type === 'ORACLE' && !id) { if (type === 'ORACLE' && !id) {
state.detailForm.connectType = 'ORACLE_SERVICE_NAME' state.detailForm.connectType = 'ORACLE_SERVICE_NAME'
@ -254,7 +261,7 @@ export function useForm(id?: number) {
} else { } else {
state.showPrincipal = false state.showPrincipal = false
} }
if (type === 'SSH' || type === 'ZEPPELIN') { if (type === 'SSH' || type === 'ZEPPELIN' || type === 'SAGEMAKER') {
state.showDataBaseName = false state.showDataBaseName = false
state.requiredDataBase = false state.requiredDataBase = false
state.showJDBCConnectParameters = false state.showJDBCConnectParameters = false
@ -267,6 +274,10 @@ export function useForm(id?: number) {
state.showPort = false state.showPort = false
state.showRestEndpoint = true state.showRestEndpoint = true
} }
if (type === 'SAGEMAKER') {
state.showHost = false
state.showPort = false
}
} else { } else {
state.showDataBaseName = true state.showDataBaseName = true
state.requiredDataBase = true state.requiredDataBase = true
@ -425,6 +436,11 @@ export const datasourceType: IDataBaseOptionKeys = {
value: 'DORIS', value: 'DORIS',
label: 'DORIS', label: 'DORIS',
defaultPort: 9030 defaultPort: 9030
},
SAGEMAKER: {
value: 'SAGEMAKER',
label: 'SAGEMAKER',
defaultPort: 0
} }
} }

5
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts

@ -142,6 +142,11 @@ export function useDatasource(
id: 23, id: 23,
code: 'DORIS', code: 'DORIS',
disabled: false disabled: false
},
{
id: 24,
code: 'SAGEMAKER',
disabled: false
} }
] ]

5
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -407,6 +407,11 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SAGEMAKER') { if (data.taskType === 'SAGEMAKER') {
taskParams.sagemakerRequestJson = data.sagemakerRequestJson taskParams.sagemakerRequestJson = data.sagemakerRequestJson
taskParams.username = data.username
taskParams.password = data.password
taskParams.datasource = data.datasource
taskParams.type = data.type
taskParams.awsRegion = data.awsRegion
} }
if (data.taskType === 'PYTORCH') { if (data.taskType === 'PYTORCH') {
taskParams.script = data.script taskParams.script = data.script

8
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts

@ -43,7 +43,12 @@ export function userSagemaker({
workerGroup: 'default', workerGroup: 'default',
delayTime: 0, delayTime: 0,
timeout: 30, timeout: 30,
timeoutNotifyStrategy: ['WARN'] type: 'SAGEMAKER',
displayRows: 10,
timeoutNotifyStrategy: ['WARN'],
username: '',
password: '',
awsRegion: ''
} as INodeData) } as INodeData)
return { return {
@ -60,6 +65,7 @@ export function userSagemaker({
...Fields.useFailed(), ...Fields.useFailed(),
Fields.useDelayTime(model), Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model), ...Fields.useTimeoutAlarm(model),
...Fields.useDatasource(model),
...Fields.useSagemaker(model), ...Fields.useSagemaker(model),
Fields.usePreTasks() Fields.usePreTasks()
] as IJsonItem[], ] as IJsonItem[],

2
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -451,6 +451,8 @@ interface ITaskParams {
filterCondition?: string filterCondition?: string
listParameters?: Array<any> listParameters?: Array<any>
yarnQueue?: string yarnQueue?: string
awsRegion?: string
kubeConfig?: string
} }
interface INodeData interface INodeData

Loading…
Cancel
Save