Browse Source

Fix rpc channel leak due to concurrent operation (#16021)

* Fix rpc channel leak due to concurrent operation

* Throw channel create failed exception

---------

Co-authored-by: Rick Cheng <rickchengx@gmail.com>
upstream-dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
5463d0255a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 60
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
  2. 31
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java

60
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java

@ -31,10 +31,12 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.Bootstrap;
@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {
private final Bootstrap bootstrap = new Bootstrap();
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
private final ReentrantLock channelsLock = new ReentrantLock();
private final Map<Host, Channel> channels = new ConcurrentHashMap<>();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@ -104,9 +107,10 @@ public class NettyRemotingClient implements AutoCloseable {
isStarted.compareAndSet(false, true);
}
public IRpcResponse sendSync(final Host host, final Transporter transporter,
public IRpcResponse sendSync(final Host host,
final Transporter transporter,
final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
@ -137,36 +141,43 @@ public class NettyRemotingClient implements AutoCloseable {
return iRpcResponse;
}
private Channel getChannel(Host host) {
private Channel getOrCreateChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
return createChannel(host, true);
try {
channelsLock.lock();
channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
channel = createChannel(host);
channels.put(host, channel);
} finally {
channelsLock.unlock();
}
return channel;
}
/**
* create channel
*
* @param host host
* @param isSync sync flag
* @param host host
* @return channel
*/
private Channel createChannel(Host host, boolean isSync) {
private Channel createChannel(Host host) {
try {
ChannelFuture future;
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if (isSync) {
future.sync();
}
future.await(clientConfig.getConnectTimeoutMillis());
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(host, channel);
return channel;
return future.channel();
} else {
throw new IllegalArgumentException("connect to host: " + host + " failed", future.cause());
}
throw new IllegalArgumentException("connect to host: " + host + " failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connect to host: " + host + " failed", e);
@ -189,16 +200,23 @@ public class NettyRemotingClient implements AutoCloseable {
}
private void closeChannels() {
for (Channel channel : this.channels.values()) {
channel.close();
try {
channelsLock.lock();
channels.values().forEach(Channel::close);
} finally {
channelsLock.unlock();
}
this.channels.clear();
}
public void closeChannel(Host host) {
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
try {
channelsLock.lock();
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
}
} finally {
channelsLock.unlock();
}
}
}

31
dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.base.utils;
import org.junit.jupiter.api.Test;
import com.google.common.truth.Truth;
class HostTest {
@Test
void testEquals() {
Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080"));
}
}
Loading…
Cancel
Save