Browse Source

Fix session/thread leak at ssh datasource (#14859)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
bc4326125d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
  2. 14
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  3. 14
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java
  4. 7
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java
  5. 81
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java
  6. 22
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java
  7. 30
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -55,7 +54,6 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -360,34 +358,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
@Override @Override
public Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) { public Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) {
Result<Object> result = new Result<>(); Result<Object> result = new Result<>();
if (type == DbType.SSH) {
DataSourceProcessor sshDataSourceProcessor = DataSourceUtils.getDatasourceProcessor(type); DataSourceProcessor sshDataSourceProcessor = DataSourceUtils.getDatasourceProcessor(type);
if (sshDataSourceProcessor.testConnection(connectionParam)) { boolean connectivity = sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam);
if (connectivity) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.CONNECT_DATASOURCE_FAILURE);
}
return result;
}
try (Connection connection = DataSourceClientProvider.getAdHocConnection(type, connectionParam)) {
if (connection == null) {
log.error("Connection test to {} datasource failed, connectionParam:{}.", type.getDescp(),
connectionParam);
putMsg(result, Status.CONNECTION_TEST_FAILURE); putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
} }
log.info("Connection test to {} datasource success, connectionParam:{}", type.getDescp(), log.info("Connection test to {} datasource success, connectionParam:{}", type.name(), connectionParam);
connectionParam);
putMsg(result, Status.SUCCESS);
return result; return result;
} catch (Exception e) {
String message = Optional.of(e).map(Throwable::getCause)
.map(Throwable::getMessage)
.orElse(e.getMessage());
log.error("Datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type,
connectionParam, message);
return new Result<>(Status.CONNECTION_TEST_FAILURE);
}
} }
/** /**

14
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
@ -501,16 +502,17 @@ public class DataSourceServiceTest {
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam); ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam);
try ( try (
MockedStatic<DataSourceClientProvider> mockedStaticDataSourceClientProvider = MockedStatic<DataSourceUtils> mockedStaticDataSourceClientProvider =
Mockito.mockStatic(DataSourceClientProvider.class)) { Mockito.mockStatic(DataSourceUtils.class)) {
DataSourceClientProvider clientProvider = Mockito.mock(DataSourceClientProvider.class); DataSourceProcessor dataSourceProcessor = Mockito.mock(DataSourceProcessor.class);
Mockito.when(DataSourceUtils.getDatasourceProcessor(Mockito.any())).thenReturn(dataSourceProcessor);
Mockito.when(dataSourceProcessor.checkDataSourceConnectivity(Mockito.any())).thenReturn(false);
Result result = dataSourceService.checkConnection(dataSourceType, connectionParam); Result result = dataSourceService.checkConnection(dataSourceType, connectionParam);
Assertions.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue());
Connection connection = Mockito.mock(Connection.class); Mockito.when(dataSourceProcessor.checkDataSourceConnectivity(Mockito.any())).thenReturn(true);
Mockito.when(DataSourceClientProvider.getAdHocConnection(Mockito.any(), Mockito.any()))
.thenReturn(connection);
result = dataSourceService.checkConnection(dataSourceType, connectionParam); result = dataSourceService.checkConnection(dataSourceType, connectionParam);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
} }

14
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java

@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@Slf4j
public abstract class AbstractDataSourceProcessor implements DataSourceProcessor { public abstract class AbstractDataSourceProcessor implements DataSourceProcessor {
private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$"); private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$");
@ -111,4 +115,14 @@ public abstract class AbstractDataSourceProcessor implements DataSourceProcessor
return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), baseConnectionParam.getUser(), return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(), baseConnectionParam.getUser(),
PasswordUtils.encodePassword(baseConnectionParam.getPassword()), baseConnectionParam.getJdbcUrl()); PasswordUtils.encodePassword(baseConnectionParam.getPassword()), baseConnectionParam.getJdbcUrl());
} }
@Override
public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) {
try (Connection connection = getConnection(connectionParam)) {
return true;
} catch (Exception e) {
log.error("Check datasource connectivity for: {} error", getDbType().name(), e);
return false;
}
}
} }

7
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java

@ -93,17 +93,16 @@ public interface DataSourceProcessor {
* @param connectionParam connectionParam * @param connectionParam connectionParam
* @return {@link Connection} * @return {@link Connection}
*/ */
// todo: Change to return a ConnectionWrapper
Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException; Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException;
/** /**
* test connection, use for not jdbc datasource * test connection
* *
* @param connectionParam connectionParam * @param connectionParam connectionParam
* @return true if connection is valid * @return true if connection is valid
*/ */
default boolean testConnection(ConnectionParam connectionParam) { boolean checkDataSourceConnectivity(ConnectionParam connectionParam);
return false;
}
/** /**
* @return {@link DbType} * @return {@link DbType}

81
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SshClientWrapper.java

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.ssh;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.config.keys.loader.KeyPairResourceLoader;
import org.apache.sshd.common.util.security.SecurityUtils;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.time.Duration;
import java.util.Collection;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SshClientWrapper implements AutoCloseable {
private SshClient sshClient;
private final ClientSession clientSession;
public SshClientWrapper(
String ip, Integer port, String userName, String password, String privateKey)
throws IOException,
GeneralSecurityException {
checkNotNull(ip);
checkNotNull(port);
checkNotNull(userName);
clientSession = createSession(ip, port, userName);
if (StringUtils.isNotEmpty(password)) {
clientSession.addPasswordIdentity(password);
}
if (StringUtils.isNotEmpty(privateKey)) {
KeyPairResourceLoader loader = SecurityUtils.getKeyPairResourceParser();
Collection<KeyPair> keyPairCollection = loader.loadKeyPairs(null, null, null, privateKey);
for (KeyPair keyPair : keyPairCollection) {
clientSession.addPublicKeyIdentity(keyPair);
}
}
}
public boolean isAuth() throws IOException {
return clientSession.auth().verify(Duration.ofSeconds(10)).isSuccess();
}
private ClientSession createSession(String ip, Integer port, String userName) throws IOException {
sshClient = SshClient.setUpDefaultClient();
sshClient.start();
return sshClient.connect(userName, ip, port).verify(Duration.ofSeconds(10)).getSession();
}
@Override
public void close() throws Exception {
try (
ClientSession clientSession1 = clientSession;
SshClient sshClient1 = sshClient) {
// closed the resources
}
}
}

22
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/main/java/org/apache/dolphinscheduler/plugin/datasource/ssh/param/SSHDataSourceProcessor.java

@ -21,13 +21,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils; import org.apache.dolphinscheduler.plugin.datasource.ssh.SshClientWrapper;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import java.sql.Connection; import java.sql.Connection;
import java.text.MessageFormat; import java.text.MessageFormat;
@ -114,15 +112,21 @@ public class SSHDataSourceProcessor implements DataSourceProcessor {
} }
@Override @Override
public boolean testConnection(ConnectionParam connectionParam) { public boolean checkDataSourceConnectivity(ConnectionParam connectionParam) {
SSHConnectionParam baseConnectionParam = (SSHConnectionParam) connectionParam; SSHConnectionParam baseConnectionParam = (SSHConnectionParam) connectionParam;
SshClient client = SshClient.setUpDefaultClient(); try (
client.start(); SshClientWrapper sshClientWrapper = new SshClientWrapper(
try { baseConnectionParam.getHost(),
ClientSession session = SSHUtils.getSession(client, baseConnectionParam); baseConnectionParam.getPort(),
return session.auth().verify().isSuccess(); baseConnectionParam.getUser(),
baseConnectionParam.getPassword(),
baseConnectionParam.getPublicKey())) {
return sshClientWrapper.isAuth();
} catch (Exception e) { } catch (Exception e) {
log.error("ssh test connection failed", e);
return false; return false;
} }
} }

30
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-ssh/src/test/java/org/apache/dolphinscheduler/plugin/datasource/ssh/SSHDataSourceProcessorTest.java

@ -17,9 +17,6 @@
package org.apache.dolphinscheduler.plugin.datasource.ssh; package org.apache.dolphinscheduler.plugin.datasource.ssh;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceParamDTO;
@ -27,15 +24,11 @@ import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceProc
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.sshd.client.session.ClientSession;
import java.io.IOException;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic; import org.mockito.MockedConstruction;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -100,20 +93,19 @@ public class SSHDataSourceProcessorTest {
} }
@Test @Test
void testTestConnection() throws IOException { void testTestConnection() {
SSHDataSourceParamDTO sshDataSourceParamDTO = SSHDataSourceParamDTO sshDataSourceParamDTO =
(SSHDataSourceParamDTO) sshDataSourceProcessor.createDatasourceParamDTO(connectJson); (SSHDataSourceParamDTO) sshDataSourceProcessor.createDatasourceParamDTO(connectJson);
ConnectionParam connectionParam = sshDataSourceProcessor.createConnectionParams(sshDataSourceParamDTO); ConnectionParam connectionParam = sshDataSourceProcessor.createConnectionParams(sshDataSourceParamDTO);
MockedStatic<SSHUtils> sshConnectionUtilsMockedStatic = org.mockito.Mockito.mockStatic(SSHUtils.class); Assertions.assertFalse(sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
sshConnectionUtilsMockedStatic.when(() -> SSHUtils.getSession(Mockito.any(), Mockito.any())).thenReturn(null);
Assertions.assertFalse(sshDataSourceProcessor.testConnection(connectionParam)); try (
MockedConstruction<SshClientWrapper> sshClientWrapperMockedConstruction =
ClientSession clientSession = Mockito.mock(ClientSession.class, RETURNS_DEEP_STUBS); Mockito.mockConstruction(SshClientWrapper.class, (mock, context) -> {
sshConnectionUtilsMockedStatic.when(() -> SSHUtils.getSession(Mockito.any(), Mockito.any())) Mockito.when(mock.isAuth()).thenReturn(true);
.thenReturn(clientSession); })) {
when(clientSession.auth().verify().isSuccess()).thenReturn(true); Assertions.assertTrue(sshDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
Assertions.assertTrue(sshDataSourceProcessor.testConnection(connectionParam)); }
} }
} }

Loading…
Cancel
Save