diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index ccd6b3a4f7..77b9d0c0c8 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -120,6 +120,7 @@ 0.10.1 1.17.6 3.19.0 + 0.10.1 diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml index 71905dea45..cb9c78f9d3 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml @@ -143,5 +143,10 @@ dolphinscheduler-datasource-doris ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-datasource-sagemaker + ${project.version} + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml new file mode 100644 index 0000000000..870abed97a --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-datasource-plugin + dev-SNAPSHOT + + + dolphinscheduler-datasource-sagemaker + jar + ${project.artifactId} + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-datasource-api + ${project.version} + + + + com.amazonaws + aws-java-sdk-sagemaker + + + + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java new file mode 100644 index 0000000000..2996789221 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerClientWrapper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker; + +import static com.google.common.base.Preconditions.checkNotNull; + +import lombok.extern.slf4j.Slf4j; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.sagemaker.AmazonSageMaker; +import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder; +import com.amazonaws.services.sagemaker.model.ListNotebookInstancesRequest; + +@Slf4j +public class SagemakerClientWrapper implements AutoCloseable { + + private AmazonSageMaker amazonSageMaker; + + public SagemakerClientWrapper(String accessKey, String secretAccessKey, String region) { + checkNotNull(accessKey, "sagemaker accessKey cannot be null"); + checkNotNull(secretAccessKey, "sagemaker secretAccessKey cannot be null"); + checkNotNull(region, "sagemaker region cannot be null"); + + final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretAccessKey); + final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials); + // create a SageMaker client + amazonSageMaker = AmazonSageMakerClientBuilder.standard().withCredentials(awsCredentialsProvider) + .withRegion(region).build(); + } + + public boolean checkConnect() { + try { + // If listing notebook instances fails, an exception will be thrown directly + ListNotebookInstancesRequest request = new ListNotebookInstancesRequest(); + amazonSageMaker.listNotebookInstances(request); + log.info("sagemaker client connects to server successfully"); + return true; + } catch (Exception e) { + log.info("sagemaker client failed to connect to the server"); + } + return false; + } + + @Override + public void close() { + + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java new file mode 100644 index 0000000000..03b96cf8a3 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker; + +import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient; +import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient; +import org.apache.dolphinscheduler.spi.enums.DbType; + +public class SagemakerDataSourceChannel implements DataSourceChannel { + + @Override + public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Sagemaker AdHocDataSourceClient is not supported"); + } + + @Override + public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Sagemaker AdHocDataSourceClient is not supported"); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java new file mode 100644 index 0000000000..04ab93f36f --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceChannelFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker; + +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceChannelFactory.class) +public class SagemakerDataSourceChannelFactory implements DataSourceChannelFactory { + + @Override + public DataSourceChannel create() { + return new SagemakerDataSourceChannel(); + } + + @Override + public String getName() { + return "sagemaker"; + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java new file mode 100644 index 0000000000..19bf72eb56 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerConnectionParam.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param; + +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonInclude; + +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SagemakerConnectionParam implements ConnectionParam { + + protected String userName; + + protected String password; + + protected String awsRegion; +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java new file mode 100644 index 0000000000..902b2b1d3b --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceParamDTO.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import lombok.Data; + +@Data +public class SagemakerDataSourceParamDTO extends BaseDataSourceParamDTO { + + protected String awsRegion; + + @Override + public DbType getType() { + return DbType.SAGEMAKER; + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java new file mode 100644 index 0000000000..6a534a5f0e --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/param/SagemakerDataSourceProcessor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker.param; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.SagemakerClientWrapper; +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.text.MessageFormat; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceProcessor.class) +@Slf4j +public class SagemakerDataSourceProcessor implements DataSourceProcessor { + + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, SagemakerDataSourceParamDTO.class); + } + + @Override + public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParamDTO) { + SagemakerDataSourceParamDTO sageMakerDataSourceParamDTO = (SagemakerDataSourceParamDTO) datasourceParamDTO; + if (StringUtils.isEmpty(sageMakerDataSourceParamDTO.getUserName()) + || StringUtils.isEmpty(sageMakerDataSourceParamDTO.getPassword()) + || StringUtils.isEmpty(sageMakerDataSourceParamDTO.getAwsRegion())) { + throw new IllegalArgumentException("sagemaker datasource param is not valid"); + } + } + + @Override + public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { + SagemakerConnectionParam baseConnectionParam = (SagemakerConnectionParam) connectionParam; + return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), + PasswordUtils.encodePassword(baseConnectionParam.getUserName()), + PasswordUtils.encodePassword(baseConnectionParam.getPassword()), + PasswordUtils.encodePassword(baseConnectionParam.getAwsRegion())); + } + + // SageMaker + @Override + public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { + SagemakerConnectionParam connectionParams = (SagemakerConnectionParam) createConnectionParams(connectionJson); + SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new SagemakerDataSourceParamDTO(); + + sagemakerDataSourceParamDTO.setUserName(connectionParams.getUserName()); + sagemakerDataSourceParamDTO.setPassword(connectionParams.getPassword()); + sagemakerDataSourceParamDTO.setAwsRegion(connectionParams.getAwsRegion()); + return sagemakerDataSourceParamDTO; + } + + @Override + public SagemakerConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { + SagemakerDataSourceParamDTO sageMakerDataSourceParam = (SagemakerDataSourceParamDTO) datasourceParam; + SagemakerConnectionParam sageMakerConnectionParam = new SagemakerConnectionParam(); + sageMakerConnectionParam.setUserName(sageMakerDataSourceParam.getUserName()); + sageMakerConnectionParam.setPassword(sageMakerDataSourceParam.getPassword()); + sageMakerConnectionParam.setAwsRegion(sageMakerDataSourceParam.getAwsRegion()); + + return sageMakerConnectionParam; + } + + @Override + public ConnectionParam createConnectionParams(String connectionJson) { + return JSONUtils.parseObject(connectionJson, SagemakerConnectionParam.class); + } + + @Override + public String getDatasourceDriver() { + return ""; + } + + @Override + public String getValidationQuery() { + return ""; + } + + @Override + public String getJdbcUrl(ConnectionParam connectionParam) { + return ""; + } + + @Override + public Connection getConnection(ConnectionParam connectionParam) { + return null; + } + + @Override + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { + SagemakerConnectionParam baseConnectionParam = (SagemakerConnectionParam) connectionParam; + try ( + SagemakerClientWrapper sagemakerClientWrapper = + new SagemakerClientWrapper(baseConnectionParam.userName, + baseConnectionParam.password, baseConnectionParam.awsRegion)) { + return sagemakerClientWrapper.checkConnect(); + } catch (Exception e) { + log.error("sagemaker client failed to connect to the server", e); + return false; + } + } + + @Override + public DbType getDbType() { + return DbType.SAGEMAKER; + } + + @Override + public DataSourceProcessor create() { + return new SagemakerDataSourceProcessor(); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java new file mode 100644 index 0000000000..424f478699 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sagemaker/SagemakerDataSourceProcessorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.sagemaker; + +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerDataSourceProcessor; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) + +public class SagemakerDataSourceProcessorTest { + + private SagemakerDataSourceProcessor sagemakerDataSourceProcessor; + + private String connectJson = + "{\"userName\":\"access key\",\"password\":\"secret access key\",\"awsRegion\":\"region\"}"; + + @BeforeEach + public void init() { + sagemakerDataSourceProcessor = new SagemakerDataSourceProcessor(); + } + + @Test + void testCheckDatasourceParam() { + SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = new SagemakerDataSourceParamDTO(); + Assertions.assertThrows(IllegalArgumentException.class, + () -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO)); + sagemakerDataSourceParamDTO.setUserName("access key"); + Assertions.assertThrows(IllegalArgumentException.class, + () -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO)); + sagemakerDataSourceParamDTO.setPassword("secret access key"); + Assertions.assertThrows(IllegalArgumentException.class, + () -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO)); + sagemakerDataSourceParamDTO.setAwsRegion("region"); + + Assertions + .assertDoesNotThrow( + () -> sagemakerDataSourceProcessor.checkDatasourceParam(sagemakerDataSourceParamDTO)); + } + + @Test + void testGetDatasourceUniqueId() { + SagemakerConnectionParam sagemakerConnectionParam = new SagemakerConnectionParam(); + sagemakerConnectionParam.setUserName("access key"); + sagemakerConnectionParam.setPassword("secret access key"); + sagemakerConnectionParam.setAwsRegion("region"); + Assertions.assertEquals("sagemaker@access key@secret access key@region", + sagemakerDataSourceProcessor.getDatasourceUniqueId(sagemakerConnectionParam, DbType.SAGEMAKER)); + + } + + @Test + void testCreateDatasourceParamDTO() { + SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = + (SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson); + Assertions.assertEquals("access key", sagemakerDataSourceParamDTO.getUserName()); + Assertions.assertEquals("secret access key", sagemakerDataSourceParamDTO.getPassword()); + Assertions.assertEquals("region", sagemakerDataSourceParamDTO.getAwsRegion()); + } + + @Test + void testCreateConnectionParams() { + SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = + (SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson); + SagemakerConnectionParam sagemakerConnectionParam = + sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO); + Assertions.assertEquals("access key", sagemakerConnectionParam.getUserName()); + Assertions.assertEquals("secret access key", sagemakerConnectionParam.getPassword()); + Assertions.assertEquals("region", sagemakerConnectionParam.getAwsRegion()); + } + + @Test + void testTestConnection() { + SagemakerDataSourceParamDTO sagemakerDataSourceParamDTO = + (SagemakerDataSourceParamDTO) sagemakerDataSourceProcessor.createDatasourceParamDTO(connectJson); + SagemakerConnectionParam connectionParam = + sagemakerDataSourceProcessor.createConnectionParams(sagemakerDataSourceParamDTO); + Assertions.assertFalse(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + + try ( + MockedConstruction sshClientWrapperMockedConstruction = + Mockito.mockConstruction(SagemakerClientWrapper.class, (mock, context) -> { + Mockito.when( + mock.checkConnect()) + .thenReturn(true); + })) { + Assertions.assertTrue(sagemakerDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + } + + } +} diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml index 91882bb0b6..fb08aa7f6d 100644 --- a/dolphinscheduler-datasource-plugin/pom.xml +++ b/dolphinscheduler-datasource-plugin/pom.xml @@ -53,6 +53,7 @@ dolphinscheduler-datasource-snowflake dolphinscheduler-datasource-vertica dolphinscheduler-datasource-doris + dolphinscheduler-datasource-sagemaker diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java index 967eec3b86..2dd8958924 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java @@ -52,8 +52,8 @@ public enum DbType { VERTICA(21, "vertica"), HANA(22, "hana"), DORIS(23, "doris"), - ZEPPELIN(24, "zeppelin"); - + ZEPPELIN(24, "zeppelin"), + SAGEMAKER(25, "sagemaker"); private static final Map DB_TYPE_MAP = Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity())); @EnumValue diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml index 85d8b92f82..b115a1b6d1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml @@ -293,5 +293,9 @@ org.apache.commons commons-collections4 + + org.projectlombok + lombok + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml index dfb769a46c..7caaf28641 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/pom.xml @@ -47,6 +47,11 @@ com.amazonaws aws-java-sdk-sagemaker + + org.apache.dolphinscheduler + dolphinscheduler-datasource-all + ${project.version} + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java index 3b33eded1a..4d2c1cace4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerParameters.java @@ -17,27 +17,55 @@ package org.apache.dolphinscheduler.plugin.task.sagemaker; +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.commons.lang3.StringUtils; +import java.util.Objects; + import lombok.Getter; import lombok.Setter; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString +@Slf4j public class SagemakerParameters extends AbstractParameters { /** * request script */ private String sagemakerRequestJson; + private String username; + private String password; + private String awsRegion; + private int datasource; + private String type; @Override public boolean checkParameters() { return StringUtils.isNotEmpty(sagemakerRequestJson); } + public SagemakerTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { + DataSourceParameters dataSourceParameters = + (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource); + SagemakerTaskExecutionContext sagemakerTaskExecutionContext = new SagemakerTaskExecutionContext(); + sagemakerTaskExecutionContext.setConnectionParams( + Objects.nonNull(dataSourceParameters) ? dataSourceParameters.getConnectionParams() : null); + return sagemakerTaskExecutionContext; + } + + @Override + public ResourceParametersHelper getResources() { + ResourceParametersHelper resources = super.getResources(); + resources.put(ResourceType.DATASOURCE, datasource); + return resources; + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java index 0e8a296151..d04f5a3fac 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java @@ -23,13 +23,15 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; @@ -64,14 +66,16 @@ public class SagemakerTask extends AbstractRemoteTask { */ private SagemakerParameters parameters; - private final AmazonSageMaker client; - private final PipelineUtils utils; + private AmazonSageMaker client; + private PipelineUtils utils; private PipelineUtils.PipelineId pipelineId; + private SagemakerConnectionParam sagemakerConnectionParam; + private SagemakerTaskExecutionContext sagemakerTaskExecutionContext; + private TaskExecutionContext taskExecutionContext; public SagemakerTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); - client = createClient(); - utils = new PipelineUtils(); + this.taskExecutionContext = taskExecutionContext; } @Override @@ -83,15 +87,24 @@ public class SagemakerTask extends AbstractRemoteTask { public void init() { parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class); - - log.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters)); if (parameters == null) { throw new SagemakerTaskException("Sagemaker task params is empty"); } if (!parameters.checkParameters()) { throw new SagemakerTaskException("Sagemaker task params is not valid"); } + sagemakerTaskExecutionContext = + parameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + sagemakerConnectionParam = + (SagemakerConnectionParam) DataSourceUtils.buildConnectionParams(DbType.valueOf(parameters.getType()), + sagemakerTaskExecutionContext.getConnectionParams()); + parameters.setUsername(sagemakerConnectionParam.getUserName()); + parameters.setPassword(sagemakerConnectionParam.getPassword()); + parameters.setAwsRegion(sagemakerConnectionParam.getAwsRegion()); + log.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters)); + client = createClient(); + utils = new PipelineUtils(); } @Override @@ -170,9 +183,9 @@ public class SagemakerTask extends AbstractRemoteTask { } protected AmazonSageMaker createClient() { - final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID); - final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY); - final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION); + final String awsAccessKeyId = parameters.getUsername(); + final String awsSecretAccessKey = parameters.getPassword(); + final String awsRegion = parameters.getAwsRegion(); final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey); final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials); // create a SageMaker client diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java index 9fd8910831..4dcdb051e9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskChannel.java @@ -43,7 +43,7 @@ public class SagemakerTaskChannel implements TaskChannel { @Override public ResourceParametersHelper getResources(String parameters) { - return null; + return JSONUtils.parseObject(parameters, SagemakerParameters.class).getResources(); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java new file mode 100644 index 0000000000..68b3590663 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskExecutionContext.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sagemaker; + +import java.io.Serializable; + +/** + * master/worker task transport + */ + +public class SagemakerTaskExecutionContext implements Serializable { + + /** + * connectionParams + */ + private String connectionParams; + + public String getConnectionParams() { + return connectionParams; + } + + public void setConnectionParams(String connectionParams) { + this.connectionParams = connectionParams; + } + + @Override + public String toString() { + return "SagemakerTaskExecutionContext{" + "connectionParams='" + connectionParams + '\'' + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java index 07dd86d567..bbe3136dbf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java @@ -18,19 +18,26 @@ package org.apache.dolphinscheduler.plugin.task.sagemaker; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.commons.io.IOUtils; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -49,14 +56,30 @@ public class SagemakerTaskTest { private AmazonSageMaker client; private PipelineUtils pipelineUtils = new PipelineUtils(); + private static final String MOCK_USERNAME = "lucky"; + private static final String MOCK_PASSWORD = "root"; + private static final String MOCK_TYPE = "SAGEMAKER"; + private static final String MOCK_AWS_REGION = "REGION"; + + private static MockedStatic dataSourceUtilsStaticMock = null; + @BeforeEach public void before() { String parameters = buildParameters(); + System.out.println(parameters); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + ResourceParametersHelper resourceParametersHelper = Mockito.mock(ResourceParametersHelper.class); + SagemakerConnectionParam sagemakerConnectionParam = Mockito.mock(SagemakerConnectionParam.class); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + Mockito.when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper); + + dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class); + dataSourceUtilsStaticMock.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any())) + .thenReturn(sagemakerConnectionParam); client = Mockito.mock(AmazonSageMaker.class); - sagemakerTask = new SagemakerTask(taskExecutionContext); + sagemakerTask = spy(new SagemakerTask(taskExecutionContext)); + doReturn(client).when(sagemakerTask).createClient(); sagemakerTask.init(); StartPipelineExecutionResult startPipelineExecutionResult = Mockito.mock(StartPipelineExecutionResult.class); @@ -75,6 +98,11 @@ public class SagemakerTaskTest { Mockito.lenient().when(client.describePipelineExecution(any())).thenReturn(describePipelineExecutionResult); } + @AfterEach + public void afterEach() { + dataSourceUtilsStaticMock.close(); + } + @Test public void testStartPipelineRequest() throws Exception { StartPipelineExecutionRequest request = sagemakerTask.createStartPipelineRequest(); @@ -105,6 +133,10 @@ public class SagemakerTaskTest { throw new RuntimeException(e); } parameters.setSagemakerRequestJson(sagemakerRequestJson); + parameters.setUsername(MOCK_USERNAME); + parameters.setPassword(MOCK_PASSWORD); + parameters.setAwsRegion(MOCK_AWS_REGION); + parameters.setType(MOCK_TYPE); return JSONUtils.toJsonString(parameters); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index b3ddcbc791..c073627c91 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -32,10 +32,12 @@ import java.util.Objects; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString +@Slf4j public class ZeppelinParameters extends AbstractParameters { /** diff --git a/dolphinscheduler-ui/src/locales/en_US/datasource.ts b/dolphinscheduler-ui/src/locales/en_US/datasource.ts index 134bf015b1..e9b799b16e 100644 --- a/dolphinscheduler-ui/src/locales/en_US/datasource.ts +++ b/dolphinscheduler-ui/src/locales/en_US/datasource.ts @@ -70,6 +70,10 @@ export default { user_password_tips: 'Please enter your password', aws_region: 'Aws Region', aws_region_tips: 'Please enter AwsRegion', + aws_access_key: 'AwsAccessKey', + aws_access_key_tips: 'Please enter AwsAccessKey', + aws_secret_access_key: 'AwsSecretAccessKey', + aws_secret_access_key_tips: 'Please enter AwsSecretAccessKey', validation: 'Validation', mode_tips: 'Please select a mode', jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format', @@ -87,5 +91,11 @@ export default { SecretAccessKey: 'SecretAccessKey', SecretAccessKey_tips: 'Please input SecretAccessKey', dbUser: 'DbUser', - dbUser_tips: 'Please input DbUser' + dbUser_tips: 'Please input DbUser', + zeppelin_rest_endpoint: 'zeppelinRestEndpoint', + zeppelin_rest_endpoint_tips: 'Please input zeppelin restEndpoint', + kubeConfig: 'kubeConfig', + kubeConfig_tips: 'Please input KubeConfig', + namespace: 'namespace', + namespace_tips: 'Please input namespace' } diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts index 434f339dce..7aa797a591 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts @@ -67,6 +67,10 @@ export default { user_password_tips: '请输入密码', aws_region: 'AwsRegion', aws_region_tips: '请输入AwsRegion', + aws_access_key: 'AwsAccessKey', + aws_access_key_tips: '请输入AwsAccessKey', + aws_secret_access_key: 'AwsSecretAccessKey', + aws_secret_access_key_tips: '请输入AwsSecretAccessKey', validation: '验证', mode_tips: '请选择验证模式', jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式', @@ -86,5 +90,9 @@ export default { dbUser: 'DbUser', dbUser_tips: '请输入DbUser', zeppelin_rest_endpoint: 'zeppelinRestEndpoint', - zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint' + zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint', + kubeConfig: 'kubeConfig', + kubeConfig_tips: '请输入KubeConfig', + namespace: 'namespace', + namespace_tips: '请输入namespace' } diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts b/dolphinscheduler-ui/src/service/modules/data-source/types.ts index 8e84c7887b..a5fbb1a8a5 100644 --- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts +++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts @@ -40,6 +40,7 @@ type IDataBase = | 'DORIS' | 'KYUUBI' | 'ZEPPELIN' + | 'SAGEMAKER' type IDataBaseLabel = | 'MYSQL' @@ -61,6 +62,7 @@ type IDataBaseLabel = | 'SSH' | 'KYUUBI' | 'ZEPPELIN' + | 'SAGEMAKER' interface IDataSource { id?: number diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx index 599977accf..3a7c1e6074 100644 --- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx +++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx @@ -165,6 +165,7 @@ const DetailModal = defineComponent({ showPublicKey, modeOptions, redShiftModeOptions, + sagemakerModeOption, loading, saving, testing, @@ -321,6 +322,8 @@ const DetailModal = defineComponent({ options={ detailForm.type === 'REDSHIFT' ? redShiftModeOptions + : detailForm.type === 'SAGEMAKER' + ? sagemakerModeOption : modeOptions } > @@ -497,7 +500,11 @@ const DetailModal = defineComponent({ /> yarnQueue?: string + awsRegion?: string + kubeConfig?: string } interface INodeData