From 5463d0255a71212668606588c10189c2fb0185d2 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 20 May 2024 12:10:33 +0800 Subject: [PATCH] Fix rpc channel leak due to concurrent operation (#16021) * Fix rpc channel leak due to concurrent operation * Throw channel create failed exception --------- Co-authored-by: Rick Cheng --- .../base/client/NettyRemotingClient.java | 60 ++++++++++++------- .../extract/base/utils/HostTest.java | 31 ++++++++++ 2 files changed, 70 insertions(+), 21 deletions(-) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java index 3999f5c9f5..dafaae311d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java @@ -31,10 +31,12 @@ import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.NettyUtils; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; import io.netty.bootstrap.Bootstrap; @@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable { private final Bootstrap bootstrap = new Bootstrap(); - private final ConcurrentHashMap channels = new ConcurrentHashMap<>(128); + private final ReentrantLock channelsLock = new ReentrantLock(); + private final Map channels = new ConcurrentHashMap<>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -104,9 +107,10 @@ public class NettyRemotingClient implements AutoCloseable { isStarted.compareAndSet(false, true); } - public IRpcResponse sendSync(final Host host, final Transporter transporter, + public IRpcResponse sendSync(final Host host, + final Transporter transporter, final long timeoutMillis) throws InterruptedException, RemotingException { - final Channel channel = getChannel(host); + final Channel channel = getOrCreateChannel(host); if (channel == null) { throw new RemotingException(String.format("connect to : %s fail", host)); } @@ -137,36 +141,43 @@ public class NettyRemotingClient implements AutoCloseable { return iRpcResponse; } - private Channel getChannel(Host host) { + private Channel getOrCreateChannel(Host host) { Channel channel = channels.get(host); if (channel != null && channel.isActive()) { return channel; } - return createChannel(host, true); + try { + channelsLock.lock(); + channel = channels.get(host); + if (channel != null && channel.isActive()) { + return channel; + } + channel = createChannel(host); + channels.put(host, channel); + } finally { + channelsLock.unlock(); + } + return channel; } /** * create channel * - * @param host host - * @param isSync sync flag + * @param host host * @return channel */ - private Channel createChannel(Host host, boolean isSync) { + private Channel createChannel(Host host) { try { ChannelFuture future; synchronized (bootstrap) { future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); } - if (isSync) { - future.sync(); - } + future.await(clientConfig.getConnectTimeoutMillis()); if (future.isSuccess()) { - Channel channel = future.channel(); - channels.put(host, channel); - return channel; + return future.channel(); + } else { + throw new IllegalArgumentException("connect to host: " + host + " failed", future.cause()); } - throw new IllegalArgumentException("connect to host: " + host + " failed"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Connect to host: " + host + " failed", e); @@ -189,16 +200,23 @@ public class NettyRemotingClient implements AutoCloseable { } private void closeChannels() { - for (Channel channel : this.channels.values()) { - channel.close(); + try { + channelsLock.lock(); + channels.values().forEach(Channel::close); + } finally { + channelsLock.unlock(); } - this.channels.clear(); } public void closeChannel(Host host) { - Channel channel = this.channels.remove(host); - if (channel != null) { - channel.close(); + try { + channelsLock.lock(); + Channel channel = this.channels.remove(host); + if (channel != null) { + channel.close(); + } + } finally { + channelsLock.unlock(); } } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java new file mode 100644 index 0000000000..5bd1a0a0ba --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.base.utils; + +import org.junit.jupiter.api.Test; + +import com.google.common.truth.Truth; + +class HostTest { + + @Test + void testEquals() { + Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080")); + } + +}