From bc4326125da94eca35e3a1e4c4b0f545918e5e39 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 7 Sep 2023 11:55:31 +0800 Subject: [PATCH] Fix session/thread leak at ssh datasource (#14859) --- .../service/impl/DataSourceServiceImpl.java | 35 ++------ .../api/service/DataSourceServiceTest.java | 14 ++-- .../AbstractDataSourceProcessor.java | 14 ++++ .../api/datasource/DataSourceProcessor.java | 7 +- .../datasource/ssh/SshClientWrapper.java | 81 +++++++++++++++++++ .../ssh/param/SSHDataSourceProcessor.java | 22 ++--- .../ssh/SSHDataSourceProcessorTest.java | 30 +++---- 7 files changed, 137 insertions(+), 66 deletions(-) create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java index 6f0e5c50d7..c095c2bd62 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java @@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -55,7 +54,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -360,34 +358,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource @Override public Result checkConnection(DbType type, ConnectionParam connectionParam) { Result result = new Result<>(); - if (type == DbType.SSH) { - DataSourceProcessor sshDataSourceProcessor = DataSourceUtils.getDatasourceProcessor(type); - if (sshDataSourceProcessor.testConnection(connectionParam)) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.CONNECT_DATASOURCE_FAILURE); - } - return result; - } - try (Connection connection = DataSourceClientProvider.getAdHocConnection(type, connectionParam)) { - if (connection == null) { - log.error("Connection test to {} datasource failed, connectionParam:{}.", type.getDescp(), - connectionParam); - putMsg(result, Status.CONNECTION_TEST_FAILURE); - return result; - } - log.info("Connection test to {} datasource success, connectionParam:{}", type.getDescp(), - connectionParam); + DataSourceProcessor sshDataSourceProcessor = DataSourceUtils.getDatasourceProcessor(type); + boolean connectivity = sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam); + if (connectivity) { putMsg(result, Status.SUCCESS); - return result; - } catch (Exception e) { - String message = Optional.of(e).map(Throwable::getCause) - .map(Throwable::getMessage) - .orElse(e.getMessage()); - log.error("Datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, - connectionParam, message); - return new Result<>(Status.CONNECTION_TEST_FAILURE); + } else { + putMsg(result, Status.CONNECTION_TEST_FAILURE); } + log.info("Connection test to {} datasource success, connectionParam:{}", type.name(), connectionParam); + return result; } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java index bc1616cb7e..411979542b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; @@ -501,16 +502,17 @@ public class DataSourceServiceTest { ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam); try ( - MockedStatic mockedStaticDataSourceClientProvider = - Mockito.mockStatic(DataSourceClientProvider.class)) { - DataSourceClientProvider clientProvider = Mockito.mock(DataSourceClientProvider.class); + MockedStatic mockedStaticDataSourceClientProvider = + Mockito.mockStatic(DataSourceUtils.class)) { + DataSourceProcessor dataSourceProcessor = Mockito.mock(DataSourceProcessor.class); + + Mockito.when(DataSourceUtils.getDatasourceProcessor(Mockito.any())).thenReturn(dataSourceProcessor); + Mockito.when(dataSourceProcessor.checkDataSourceConnectivity(Mockito.any())).thenReturn(false); Result result = dataSourceService.checkConnection(dataSourceType, connectionParam); Assertions.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue()); - Connection connection = Mockito.mock(Connection.class); - Mockito.when(DataSourceClientProvider.getAdHocConnection(Mockito.any(), Mockito.any())) - .thenReturn(connection); + Mockito.when(dataSourceProcessor.checkDataSourceConnectivity(Mockito.any())).thenReturn(true); result = dataSourceService.checkConnection(dataSourceType, connectionParam); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java index 8981b4e502..fcc744d91a 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java @@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import java.sql.Connection; import java.text.MessageFormat; import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Sets; +@Slf4j public abstract class AbstractDataSourceProcessor implements DataSourceProcessor { private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$"); @@ -111,4 +115,14 @@ public abstract class AbstractDataSourceProcessor implements DataSourceProcessor return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), baseConnectionParam.getUser(), PasswordUtils.encodePassword(baseConnectionParam.getPassword()), baseConnectionParam.getJdbcUrl()); } + + @Override + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { + try (Connection connection = getConnection(connectionParam)) { + return true; + } catch (Exception e) { + log.error("Check datasource connectivity for: {} error", getDbType().name(), e); + return false; + } + } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java index 170b391c45..99f424e3dd 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java @@ -93,17 +93,16 @@ public interface DataSourceProcessor { * @param connectionParam connectionParam * @return {@link Connection} */ + // todo: Change to return a ConnectionWrapper Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException; /** - * test connection, use for not jdbc datasource + * test connection * * @param connectionParam connectionParam * @return true if connection is valid */ - default boolean testConnection(ConnectionParam connectionParam) { - return false; - } + boolean checkDataSourceConnectivity(ConnectionParam connectionParam); /** * @return {@link DbType} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java new file mode 100644 index 0000000000..b9a60ec7f1 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.ssh; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.commons.lang3.StringUtils; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.common.config.keys.loader.KeyPairResourceLoader; +import org.apache.sshd.common.util.security.SecurityUtils; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyPair; +import java.time.Duration; +import java.util.Collection; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SshClientWrapper implements AutoCloseable { + + private SshClient sshClient; + + private final ClientSession clientSession; + + public SshClientWrapper( + String ip, Integer port, String userName, String password, String privateKey) + throws IOException, + GeneralSecurityException { + checkNotNull(ip); + checkNotNull(port); + checkNotNull(userName); + clientSession = createSession(ip, port, userName); + if (StringUtils.isNotEmpty(password)) { + clientSession.addPasswordIdentity(password); + } + if (StringUtils.isNotEmpty(privateKey)) { + KeyPairResourceLoader loader = SecurityUtils.getKeyPairResourceParser(); + Collection keyPairCollection = loader.loadKeyPairs(null, null, null, privateKey); + for (KeyPair keyPair : keyPairCollection) { + clientSession.addPublicKeyIdentity(keyPair); + } + } + } + + public boolean isAuth() throws IOException { + return clientSession.auth().verify(Duration.ofSeconds(10)).isSuccess(); + } + + private ClientSession createSession(String ip, Integer port, String userName) throws IOException { + sshClient = SshClient.setUpDefaultClient(); + sshClient.start(); + return sshClient.connect(userName, ip, port).verify(Duration.ofSeconds(10)).getSession(); + } + + @Override + public void close() throws Exception { + try ( + ClientSession clientSession1 = clientSession; + SshClient sshClient1 = sshClient) { + // closed the resources + } + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java index 37dee2979c..b20a776014 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java @@ -21,13 +21,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; -import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils; +import org.apache.dolphinscheduler.plugin.datasource.ssh.SshClientWrapper; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.lang3.StringUtils; -import org.apache.sshd.client.SshClient; -import org.apache.sshd.client.session.ClientSession; import java.sql.Connection; import java.text.MessageFormat; @@ -114,15 +112,21 @@ public class SSHDataSourceProcessor implements DataSourceProcessor { } @Override - public boolean testConnection(ConnectionParam connectionParam) { + public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) { SSHConnectionParam baseConnectionParam = (SSHConnectionParam) connectionParam; - SshClient client = SshClient.setUpDefaultClient(); - client.start(); - try { - ClientSession session = SSHUtils.getSession(client, baseConnectionParam); - return session.auth().verify().isSuccess(); + try ( + SshClientWrapper sshClientWrapper = new SshClientWrapper( + baseConnectionParam.getHost(), + baseConnectionParam.getPort(), + baseConnectionParam.getUser(), + baseConnectionParam.getPassword(), + baseConnectionParam.getPublicKey())) { + + return sshClientWrapper.isAuth(); } catch (Exception e) { + log.error("ssh test connection failed", e); return false; + } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java index 51aee19edd..e4ffab1ec3 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java @@ -17,9 +17,6 @@ package org.apache.dolphinscheduler.plugin.datasource.ssh; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.when; - import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceParamDTO; @@ -27,15 +24,11 @@ import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceProc import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.apache.sshd.client.session.ClientSession; - -import java.io.IOException; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.MockedStatic; +import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -100,20 +93,19 @@ public class SSHDataSourceProcessorTest { } @Test - void testTestConnection() throws IOException { + void testTestConnection() { SSHDataSourceParamDTO sshDataSourceParamDTO = (SSHDataSourceParamDTO) sshDataSourceProcessor.createDatasourceParamDTO(connectJson); ConnectionParam connectionParam = sshDataSourceProcessor.createConnectionParams(sshDataSourceParamDTO); - MockedStatic sshConnectionUtilsMockedStatic = org.mockito.Mockito.mockStatic(SSHUtils.class); - sshConnectionUtilsMockedStatic.when(() -> SSHUtils.getSession(Mockito.any(), Mockito.any())).thenReturn(null); - Assertions.assertFalse(sshDataSourceProcessor.testConnection(connectionParam)); - - ClientSession clientSession = Mockito.mock(ClientSession.class, RETURNS_DEEP_STUBS); - sshConnectionUtilsMockedStatic.when(() -> SSHUtils.getSession(Mockito.any(), Mockito.any())) - .thenReturn(clientSession); - when(clientSession.auth().verify().isSuccess()).thenReturn(true); - Assertions.assertTrue(sshDataSourceProcessor.testConnection(connectionParam)); - + Assertions.assertFalse(sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + + try ( + MockedConstruction sshClientWrapperMockedConstruction = + Mockito.mockConstruction(SshClientWrapper.class, (mock, context) -> { + Mockito.when(mock.isAuth()).thenReturn(true); + })) { + Assertions.assertTrue(sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam)); + } } }