From e99c5950b9a745878bf24e784b11fd0cbf68c5ba Mon Sep 17 00:00:00 2001 From: chenrj <102030622+xdu-chenrj@users.noreply.github.com> Date: Mon, 30 Oct 2023 16:44:54 +0800 Subject: [PATCH] [DSIP-19] Support zeppelin connections in the connection center, as well as external connections to the connection center in zeppelin tasks (#14434) * Refactoring zeppelin task plugin with connections managed in connection center --------- Co-authored-by: Eric Gao --- dolphinscheduler-bom/pom.xml | 7 + .../dolphinscheduler-datasource-all/pom.xml | 5 + .../pom.xml | 50 +++++++ .../zeppelin/ZeppelinClientWrapper.java | 55 ++++++++ .../zeppelin/ZeppelinDataSourceChannel.java | 37 +++++ .../ZeppelinDataSourceChannelFactory.java | 38 +++++ .../datasource/zeppelin/ZeppelinUtils.java | 36 +++++ .../param/ZeppelinConnectionParam.java | 35 +++++ .../param/ZeppelinDataSourceParamDTO.java | 34 +++++ .../param/ZeppelinDataSourceProcessor.java | 131 ++++++++++++++++++ .../ZeppelinDataSourceProcessorTest.java | 107 ++++++++++++++ dolphinscheduler-datasource-plugin/pom.xml | 1 + .../e2e/pages/datasource/DataSourcePage.java | 7 + .../dolphinscheduler/spi/enums/DbType.java | 3 +- .../dolphinscheduler-task-zeppelin/pom.xml | 5 + .../task/zeppelin/ZeppelinParameters.java | 23 ++- .../plugin/task/zeppelin/ZeppelinTask.java | 43 +++--- .../task/zeppelin/ZeppelinTaskChannel.java | 2 +- .../ZeppelinTaskExecutionContext.java | 46 ++++++ .../task/zeppelin/ZeppelinTaskTest.java | 33 ++++- .../src/locales/zh_CN/datasource.ts | 4 +- .../src/service/modules/data-source/types.ts | 3 + .../src/views/datasource/list/detail.tsx | 16 +++ .../src/views/datasource/list/use-form.ts | 18 ++- .../components/node/fields/use-datasource.ts | 5 + .../components/node/fields/use-zeppelin.ts | 33 ----- .../task/components/node/format-data.ts | 2 + .../components/node/tasks/use-zeppelin.ts | 8 +- 28 files changed, 723 insertions(+), 64 deletions(-) create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index 6e4770d60a..ccd6b3a4f7 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -117,6 +117,7 @@ 3.17.2 3.23.3 1.2.1 + 0.10.1 1.17.6 3.19.0 @@ -894,6 +895,12 @@ ${snowflake-jdbc.version} + + org.apache.zeppelin + zeppelin-client + ${zeppelin-client.version} + + com.google.protobuf protobuf-java diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml index 586948cab6..71905dea45 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml @@ -132,6 +132,11 @@ dolphinscheduler-datasource-vertica ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-datasource-zeppelin + ${project.version} + org.apache.dolphinscheduler diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml new file mode 100644 index 0000000000..90c7f1ece5 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-datasource-plugin + dev-SNAPSHOT + + + dolphinscheduler-datasource-zeppelin + jar + ${project.artifactId} + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-datasource-api + ${project.version} + + + + org.apache.zeppelin + zeppelin-client + + + + diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java new file mode 100644 index 0000000000..4a6c840e40 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.zeppelin.client.ClientConfig; +import org.apache.zeppelin.client.ZeppelinClient; + +import lombok.extern.slf4j.Slf4j; +@Slf4j +public class ZeppelinClientWrapper implements AutoCloseable { + + private ZeppelinClient zeppelinClient; + + public ZeppelinClientWrapper(String restEndpoint) + throws Exception { + checkNotNull(restEndpoint); + ClientConfig clientConfig = new ClientConfig(restEndpoint); + zeppelinClient = new ZeppelinClient(clientConfig); + } + + public boolean checkConnect(String username, String password) { + try { + // If the login fails, an exception will be thrown directly + zeppelinClient.login(username, password); + String version = zeppelinClient.getVersion(); + log.info("zeppelin client connects to server successfully, version is {}", version); + return true; + } catch (Exception e) { + log.info("zeppelin client failed to connect to the server"); + return false; + } + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java new file mode 100644 index 0000000000..c8e33611e7 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin; + +import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient; +import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient; +import org.apache.dolphinscheduler.spi.enums.DbType; + +public class ZeppelinDataSourceChannel implements DataSourceChannel { + + @Override + public AdHocDataSourceClient createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Zeppelin AdHocDataSourceClient is not supported"); + } + + @Override + public PooledDataSourceClient createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { + throw new UnsupportedOperationException("Zeppelin AdHocDataSourceClient is not supported"); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java new file mode 100644 index 0000000000..692819cf78 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin; + +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; +import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceChannelFactory.class) +public class ZeppelinDataSourceChannelFactory implements DataSourceChannelFactory { + + @Override + public DataSourceChannel create() { + return new ZeppelinDataSourceChannel(); + } + + @Override + public String getName() { + return "zeppelin"; + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java new file mode 100644 index 0000000000..308af03d8f --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin; + +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam; + +import org.apache.zeppelin.client.ClientConfig; +import org.apache.zeppelin.client.ZeppelinClient; + +public class ZeppelinUtils { + + private ZeppelinUtils() { + throw new IllegalStateException("Utility class"); + } + + public static ZeppelinClient getZeppelinClient(ZeppelinConnectionParam connectionParam) throws Exception { + ClientConfig clientConfig = new ClientConfig(connectionParam.getRestEndpoint()); + return new ZeppelinClient(clientConfig); + } + +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java new file mode 100644 index 0000000000..2c716a7e71 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param; + +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonInclude; + +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ZeppelinConnectionParam implements ConnectionParam { + + protected String username; + + protected String password; + + protected String restEndpoint; +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java new file mode 100644 index 0000000000..ae5a1d7025 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import lombok.Data; + +@Data +public class ZeppelinDataSourceParamDTO extends BaseDataSourceParamDTO { + + protected String restEndpoint; + + @Override + public DbType getType() { + return DbType.ZEPPELIN; + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java new file mode 100644 index 0000000000..bf2795959e --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.ZeppelinClientWrapper; +import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.text.MessageFormat; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +@AutoService(DataSourceProcessor.class) +@Slf4j +public class ZeppelinDataSourceProcessor implements DataSourceProcessor { + + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, ZeppelinDataSourceParamDTO.class); + } + + @Override + public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParamDTO) { + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = (ZeppelinDataSourceParamDTO) datasourceParamDTO; + if (StringUtils.isEmpty(zeppelinDataSourceParamDTO.getRestEndpoint()) + || StringUtils.isEmpty(zeppelinDataSourceParamDTO.getUserName())) { + throw new IllegalArgumentException("zeppelin datasource param is not valid"); + } + } + + @Override + public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { + ZeppelinConnectionParam baseConnectionParam = (ZeppelinConnectionParam) connectionParam; + return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), baseConnectionParam.getRestEndpoint(), + baseConnectionParam.getUsername(), PasswordUtils.encodePassword(baseConnectionParam.getPassword())); + } + + @Override + public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { + ZeppelinConnectionParam connectionParams = (ZeppelinConnectionParam) createConnectionParams(connectionJson); + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new ZeppelinDataSourceParamDTO(); + + zeppelinDataSourceParamDTO.setUserName(connectionParams.getUsername()); + zeppelinDataSourceParamDTO.setPassword(connectionParams.getPassword()); + zeppelinDataSourceParamDTO.setRestEndpoint(connectionParams.getRestEndpoint()); + return zeppelinDataSourceParamDTO; + } + + @Override + public ZeppelinConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { + ZeppelinDataSourceParamDTO zeppelinDataSourceParam = (ZeppelinDataSourceParamDTO) datasourceParam; + ZeppelinConnectionParam zeppelinConnectionParam = new ZeppelinConnectionParam(); + zeppelinConnectionParam.setUsername(zeppelinDataSourceParam.getUserName()); + zeppelinConnectionParam.setPassword(zeppelinDataSourceParam.getPassword()); + zeppelinConnectionParam.setRestEndpoint(zeppelinDataSourceParam.getRestEndpoint()); + + return zeppelinConnectionParam; + } + + @Override + public ConnectionParam createConnectionParams(String connectionJson) { + return JSONUtils.parseObject(connectionJson, ZeppelinConnectionParam.class); + } + + @Override + public String getDatasourceDriver() { + return ""; + } + + @Override + public String getValidationQuery() { + return ""; + } + + @Override + public String getJdbcUrl(ConnectionParam connectionParam) { + return ""; + } + + @Override + public Connection getConnection(ConnectionParam connectionParam) { + return null; + } + + @Override + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { + ZeppelinConnectionParam baseConnectionParam = (ZeppelinConnectionParam) connectionParam; + try ( + ZeppelinClientWrapper zeppelinClientWrapper = + new ZeppelinClientWrapper(baseConnectionParam.getRestEndpoint())) { + return zeppelinClientWrapper.checkConnect(baseConnectionParam.username, baseConnectionParam.password); + } catch (Exception e) { + log.error("zeppelin client failed to connect to the server", e); + return false; + } + } + + @Override + public DbType getDbType() { + return DbType.ZEPPELIN; + } + + @Override + public DataSourceProcessor create() { + return new ZeppelinDataSourceProcessor(); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java new file mode 100644 index 0000000000..05be02e722 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.zeppelin; + +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam; +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceProcessor; +import org.apache.dolphinscheduler.spi.enums.DbType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class ZeppelinDataSourceProcessorTest { + + private ZeppelinDataSourceProcessor zeppelinDataSourceProcessor; + + private String connectJson = + "{\"username\":\"lucky\",\"password\":\"123456\",\"restEndpoint\":\"https://dolphinscheduler.com:8080\"}"; + + @BeforeEach + public void init() { + zeppelinDataSourceProcessor = new ZeppelinDataSourceProcessor(); + } + + @Test + void testCheckDatasourceParam() { + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new ZeppelinDataSourceParamDTO(); + Assertions.assertThrows(IllegalArgumentException.class, + () -> zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO)); + zeppelinDataSourceParamDTO.setRestEndpoint("http://dolphinscheduler.com:8080"); + Assertions.assertThrows(IllegalArgumentException.class, + () -> zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO)); + zeppelinDataSourceParamDTO.setUserName("root"); + Assertions + .assertDoesNotThrow(() -> zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO)); + } + + @Test + void testGetDatasourceUniqueId() { + ZeppelinConnectionParam zeppelinConnectionParam = new ZeppelinConnectionParam(); + zeppelinConnectionParam.setRestEndpoint("https://dolphinscheduler.com:8080"); + zeppelinConnectionParam.setUsername("root"); + zeppelinConnectionParam.setPassword("123456"); + Assertions.assertEquals("zeppelin@https://dolphinscheduler.com:8080@root@123456", + zeppelinDataSourceProcessor.getDatasourceUniqueId(zeppelinConnectionParam, DbType.ZEPPELIN)); + + } + + @Test + void testCreateDatasourceParamDTO() { + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = + (ZeppelinDataSourceParamDTO) zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson); + Assertions.assertEquals("lucky", zeppelinDataSourceParamDTO.getUserName()); + Assertions.assertEquals("123456", zeppelinDataSourceParamDTO.getPassword()); + Assertions.assertEquals("https://dolphinscheduler.com:8080", zeppelinDataSourceParamDTO.getRestEndpoint()); + } + + @Test + void testCreateConnectionParams() { + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = + (ZeppelinDataSourceParamDTO) zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson); + ZeppelinConnectionParam zeppelinConnectionParam = + zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO); + Assertions.assertEquals("lucky", zeppelinConnectionParam.getUsername()); + Assertions.assertEquals("123456", zeppelinConnectionParam.getPassword()); + Assertions.assertEquals("https://dolphinscheduler.com:8080", zeppelinConnectionParam.getRestEndpoint()); + } + + @Test + void testTestConnection() { + ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = + (ZeppelinDataSourceParamDTO) zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson); + ZeppelinConnectionParam connectionParam = + zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO); + Assertions.assertFalse(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + try ( + MockedConstruction sshClientWrapperMockedConstruction = + Mockito.mockConstruction(ZeppelinClientWrapper.class, (mock, context) -> { + Mockito.when( + mock.checkConnect(connectionParam.getUsername(), connectionParam.getPassword())) + .thenReturn(true); + })) { + Assertions.assertTrue(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + } + } +} diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml index 79261be779..91882bb0b6 100644 --- a/dolphinscheduler-datasource-plugin/pom.xml +++ b/dolphinscheduler-datasource-plugin/pom.xml @@ -48,6 +48,7 @@ dolphinscheduler-datasource-azure-sql dolphinscheduler-datasource-dameng dolphinscheduler-datasource-ssh + dolphinscheduler-datasource-zeppelin dolphinscheduler-datasource-databend dolphinscheduler-datasource-snowflake dolphinscheduler-datasource-vertica diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java index 1f6c76fd44..3b8633443d 100644 --- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java +++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java @@ -186,5 +186,12 @@ public class DataSourcePage extends NavBarPage implements NavBarPage.NavBarItem @FindBy(className = "btn-test-connection") private WebElement btnTestConnection; + + @FindBys({ + @FindBy(className = "input-zeppelin_rest_endpoint"), + @FindBy(tagName = "input"), + }) + private WebElement inputZeppelinRestEndpoint; + } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java index 169f0ad9bf..967eec3b86 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java @@ -51,7 +51,8 @@ public enum DbType { SNOWFLAKE(20, "snowflake"), VERTICA(21, "vertica"), HANA(22, "hana"), - DORIS(23, "doris"); + DORIS(23, "doris"), + ZEPPELIN(24, "zeppelin"); private static final Map DB_TYPE_MAP = Arrays.stream(DbType.values()).collect(toMap(DbType::getCode, Functions.identity())); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml index 69a5a66994..d136977526 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml @@ -37,6 +37,11 @@ dolphinscheduler-task-api ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-datasource-all + ${project.version} + org.apache.zeppelin zeppelin-client diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index 8b1c1a341d..b3ddcbc791 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -17,13 +17,17 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; +import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.commons.lang3.StringUtils; import java.util.Collections; import java.util.List; +import java.util.Objects; import lombok.Getter; import lombok.Setter; @@ -45,10 +49,12 @@ public class ZeppelinParameters extends AbstractParameters { private String parameters; private String username; private String password; + private int datasource; + private String type; @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.restEndpoint); + return StringUtils.isNotEmpty(this.noteId); } @Override @@ -56,4 +62,19 @@ public class ZeppelinParameters extends AbstractParameters { return Collections.emptyList(); } + public ZeppelinTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { + DataSourceParameters dataSourceParameters = + (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource); + ZeppelinTaskExecutionContext zeppelinTaskExecutionContext = new ZeppelinTaskExecutionContext(); + zeppelinTaskExecutionContext.setConnectionParams( + Objects.nonNull(dataSourceParameters) ? dataSourceParameters.getConnectionParams() : null); + return zeppelinTaskExecutionContext; + } + + @Override + public ResourceParametersHelper getResources() { + ResourceParametersHelper resources = super.getResources(); + resources.put(ResourceType.DATASOURCE, datasource); + return resources; + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index 0f25527c08..f6fa8e89ff 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.client.ClientConfig; @@ -59,6 +62,10 @@ public class ZeppelinTask extends AbstractRemoteTask { */ private ZeppelinClient zClient; + private ZeppelinConnectionParam zeppelinConnectionParam; + + private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext; + /** * constructor * @@ -76,6 +83,14 @@ public class ZeppelinTask extends AbstractRemoteTask { if (this.zeppelinParameters == null || !this.zeppelinParameters.checkParameters()) { throw new ZeppelinTaskException("zeppelin task params is not valid"); } + zeppelinTaskExecutionContext = + zeppelinParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + zeppelinConnectionParam = (ZeppelinConnectionParam) DataSourceUtils + .buildConnectionParams(DbType.valueOf(zeppelinParameters.getType()), + zeppelinTaskExecutionContext.getConnectionParams()); + zeppelinParameters.setUsername(zeppelinConnectionParam.getUsername()); + zeppelinParameters.setPassword(zeppelinConnectionParam.getPassword()); + zeppelinParameters.setRestEndpoint(zeppelinConnectionParam.getRestEndpoint()); log.info("Initialize zeppelin task params:{}", JSONUtils.toPrettyJsonString(taskParams)); this.zClient = getZeppelinClient(); } @@ -111,11 +126,8 @@ public class ZeppelinTask extends AbstractRemoteTask { Status status = Status.FINISHED; // If in production, clone the note and run the cloned one for stability if (productionNoteDirectory != null) { - final String cloneNotePath = String.format( - "%s%s_%s", - productionNoteDirectory, - noteId, - DateUtils.getTimestampString()); + final String cloneNotePath = + String.format("%s%s_%s", productionNoteDirectory, noteId, DateUtils.getTimestampString()); noteId = this.zClient.cloneNote(noteId, cloneNotePath); } @@ -124,11 +136,8 @@ public class ZeppelinTask extends AbstractRemoteTask { final List paragraphResultList = noteResult.getParagraphResultList(); StringBuilder resultContentBuilder = new StringBuilder(); for (ParagraphResult paragraphResult : paragraphResultList) { - resultContentBuilder.append( - String.format( - "paragraph_id: %s, paragraph_result: %s\n", - paragraphResult.getParagraphId(), - paragraphResult.getResultInText())); + resultContentBuilder.append(String.format("paragraph_id: %s, paragraph_result: %s\n", + paragraphResult.getParagraphId(), paragraphResult.getResultInText())); status = paragraphResult.getStatus(); // we treat note execution as failure if any paragraph in the note fails // status will be further processed in method mapStatusToExitCode below @@ -221,27 +230,21 @@ public class ZeppelinTask extends AbstractRemoteTask { final String paragraphId = this.zeppelinParameters.getParagraphId(); if (paragraphId == null) { log.info("trying terminate zeppelin task, taskId: {}, noteId: {}", - this.taskExecutionContext.getTaskInstanceId(), - noteId); + this.taskExecutionContext.getTaskInstanceId(), noteId); Unirest.config().defaultBaseUrl(restEndpoint + "/api"); Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson(); - log.info("zeppelin task terminated, taskId: {}, noteId: {}", - this.taskExecutionContext.getTaskInstanceId(), + log.info("zeppelin task terminated, taskId: {}, noteId: {}", this.taskExecutionContext.getTaskInstanceId(), noteId); } else { log.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}", - this.taskExecutionContext.getTaskInstanceId(), - noteId, - paragraphId); + this.taskExecutionContext.getTaskInstanceId(), noteId, paragraphId); try { this.zClient.cancelParagraph(noteId, paragraphId); } catch (Exception e) { throw new TaskException("cancel paragraph error", e); } log.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}", - this.taskExecutionContext.getTaskInstanceId(), - noteId, - paragraphId); + this.taskExecutionContext.getTaskInstanceId(), noteId, paragraphId); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java index c2f63f3cf6..d9e7318dc4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java @@ -44,6 +44,6 @@ public class ZeppelinTaskChannel implements TaskChannel { @Override public ResourceParametersHelper getResources(String parameters) { - return null; + return JSONUtils.parseObject(parameters, ZeppelinParameters.class).getResources(); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java new file mode 100644 index 0000000000..4cc09e5f4f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.zeppelin; + +import java.io.Serializable; + +/** + * master/worker task transport + */ +public class ZeppelinTaskExecutionContext implements Serializable { + + /** + * connectionParams + */ + private String connectionParams; + + public String getConnectionParams() { + return connectionParams; + } + + public void setConnectionParams(String connectionParams) { + this.connectionParams = connectionParams; + } + + @Override + public String toString() { + return "ZeppelinTaskExecutionContext{" + + "connectionParams='" + connectionParams + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 8c5cb11377..ffaf85b207 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -28,10 +28,13 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; +import org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; @@ -40,6 +43,7 @@ import org.apache.zeppelin.client.ZeppelinClient; import java.util.Map; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,6 +63,8 @@ public class ZeppelinTaskTest { private static final String MOCK_REST_ENDPOINT = "localhost:8080"; private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8"; private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/"; + private static final String MOCK_TYPE = "ZEPPELIN"; + private static MockedStatic dataSourceUtilsStaticMock = null; private final ObjectMapper mapper = new ObjectMapper(); private ZeppelinClient zClient; @@ -80,9 +86,15 @@ public class ZeppelinTaskTest { @BeforeEach public void before() throws Exception { - String zeppelinParameters = buildZeppelinTaskParameters(); + String zeppelinTaskParameters = buildZeppelinTaskParameters(); TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class); - when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParameters); + ResourceParametersHelper resourceParametersHelper = mock(ResourceParametersHelper.class); + ZeppelinConnectionParam zeppelinConnectionParam = mock(ZeppelinConnectionParam.class); + when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParameters); + when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper); + dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class); + dataSourceUtilsStaticMock.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any())) + .thenReturn(zeppelinConnectionParam); this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext)); this.zClient = mock(ZeppelinClient.class); @@ -93,6 +105,11 @@ public class ZeppelinTaskTest { this.zeppelinTask.init(); } + @AfterEach + public void afterEach() { + dataSourceUtilsStaticMock.close(); + } + @Test public void testHandleWithParagraphExecutionSuccess() throws Exception { when(this.zClient.executeParagraph(any(), any(), any(Map.class))).thenReturn(this.paragraphResult); @@ -158,9 +175,11 @@ public class ZeppelinTaskTest { @Test public void testHandleWithNoteExecutionSuccess() throws Exception { - String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithNoParagraphId(); + String zeppelinTaskParametersWithNoParagraphId = buildZeppelinTaskParametersWithNoParagraphId(); TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class); - when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId); + ResourceParametersHelper resourceParametersHelper = mock(ResourceParametersHelper.class); + when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParametersWithNoParagraphId); + when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper); this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext)); this.zClient = mock(ZeppelinClient.class); this.noteResult = mock(NoteResult.class); @@ -183,6 +202,9 @@ public class ZeppelinTaskTest { try (MockedStatic mockedStaticDateUtils = Mockito.mockStatic(DateUtils.class)) { when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId); + ResourceParametersHelper resourceParametersHelper = mock(ResourceParametersHelper.class); + when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper); + this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext)); this.zClient = mock(ZeppelinClient.class); @@ -211,6 +233,7 @@ public class ZeppelinTaskTest { zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID); zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); zeppelinParameters.setParameters(MOCK_PARAMETERS); + zeppelinParameters.setType(MOCK_TYPE); return JSONUtils.toJsonString(zeppelinParameters); } @@ -220,6 +243,7 @@ public class ZeppelinTaskTest { zeppelinParameters.setNoteId(MOCK_NOTE_ID); zeppelinParameters.setParameters(MOCK_PARAMETERS); zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); + zeppelinParameters.setType(MOCK_TYPE); return JSONUtils.toJsonString(zeppelinParameters); } @@ -230,6 +254,7 @@ public class ZeppelinTaskTest { zeppelinParameters.setParameters(MOCK_PARAMETERS); zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY); + zeppelinParameters.setType(MOCK_TYPE); return JSONUtils.toJsonString(zeppelinParameters); } diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts index 6edc3befea..434f339dce 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts @@ -84,5 +84,7 @@ export default { SecretAccessKey: 'SecretAccessKey', SecretAccessKey_tips: '请输入SecretAccessKey', dbUser: 'DbUser', - dbUser_tips: '请输入DbUser' + dbUser_tips: '请输入DbUser', + zeppelin_rest_endpoint: 'zeppelinRestEndpoint', + zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint' } diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts b/dolphinscheduler-ui/src/service/modules/data-source/types.ts index 59173151af..8e84c7887b 100644 --- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts +++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts @@ -39,6 +39,7 @@ type IDataBase = | 'HANA' | 'DORIS' | 'KYUUBI' + | 'ZEPPELIN' type IDataBaseLabel = | 'MYSQL' @@ -59,6 +60,7 @@ type IDataBaseLabel = | 'OCEANBASE' | 'SSH' | 'KYUUBI' + | 'ZEPPELIN' interface IDataSource { id?: number @@ -80,6 +82,7 @@ interface IDataSource { connectType?: string other?: object endpoint?: string + restEndpoint?: string MSIClientId?: string dbUser?: string compatibleMode?: string diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx index f44ca4704e..599977accf 100644 --- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx +++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx @@ -154,6 +154,7 @@ const DetailModal = defineComponent({ requiredDataBase, showHost, showPort, + showRestEndpoint, showAwsRegion, showCompatibleMode, showConnectType, @@ -251,6 +252,21 @@ const DetailModal = defineComponent({ placeholder={t('datasource.ip_tips')} /> + + +