@ -18,10 +18,16 @@
package org.apache.dolphinscheduler.remote ;
import io.netty.bootstrap.Bootstrap ;
import io.netty.channel.* ;
import io.netty.channel.Channel ;
import io.netty.channel.ChannelFuture ;
import io.netty.channel.ChannelFutureListener ;
import io.netty.channel.ChannelInitializer ;
import io.netty.channel.ChannelOption ;
import io.netty.channel.EventLoopGroup ;
import io.netty.channel.epoll.EpollEventLoopGroup ;
import io.netty.channel.nio.NioEventLoopGroup ;
import io.netty.channel.socket.SocketChannel ;
import io.netty.channel.socket.nio.NioSocketChannel ;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder ;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder ;
import org.apache.dolphinscheduler.remote.command.Command ;
@ -38,6 +44,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy ;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory ;
import org.apache.dolphinscheduler.remote.utils.NettyUtils ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
@ -47,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger ;
/ * *
* remoting netty client
* remoting netty client
* /
public class NettyRemotingClient {
@ -59,7 +67,7 @@ public class NettyRemotingClient {
private final Bootstrap bootstrap = new Bootstrap ( ) ;
/ * *
* encoder
* encoder
* /
private final NettyEncoder encoder = new NettyEncoder ( ) ;
@ -69,57 +77,69 @@ public class NettyRemotingClient {
private final ConcurrentHashMap < Host , Channel > channels = new ConcurrentHashMap ( 128 ) ;
/ * *
* started flag
* started flag
* /
private final AtomicBoolean isStarted = new AtomicBoolean ( false ) ;
/ * *
* worker group
* worker group
* /
private final Nio EventLoopGroup workerGroup ;
private final EventLoopGroup workerGroup ;
/ * *
* client config
* client config
* /
private final NettyClientConfig clientConfig ;
/ * *
* saync semaphore
* saync semaphore
* /
private final Semaphore asyncSemaphore = new Semaphore ( 200 , true ) ;
/ * *
* callback thread executor
* callback thread executor
* /
private final ExecutorService callbackExecutor ;
/ * *
* client handler
* client handler
* /
private final NettyClientHandler clientHandler ;
/ * *
* response future executor
* response future executor
* /
private final ScheduledExecutorService responseFutureExecutor ;
/ * *
* client init
* client init
*
* @param clientConfig client config
* /
public NettyRemotingClient ( final NettyClientConfig clientConfig ) {
public NettyRemotingClient ( final NettyClientConfig clientConfig ) {
this . clientConfig = clientConfig ;
this . workerGroup = new NioEventLoopGroup ( clientConfig . getWorkerThreads ( ) , new ThreadFactory ( ) {
private AtomicInteger threadIndex = new AtomicInteger ( 0 ) ;
if ( NettyUtils . useEpoll ( ) ) {
this . workerGroup = new EpollEventLoopGroup ( clientConfig . getWorkerThreads ( ) , new ThreadFactory ( ) {
private AtomicInteger threadIndex = new AtomicInteger ( 0 ) ;
@Override
public Thread newThread ( Runnable r ) {
return new Thread ( r , String . format ( "NettyClient_%d" , this . threadIndex . incrementAndGet ( ) ) ) ;
}
} ) ;
@Override
public Thread newThread ( Runnable r ) {
return new Thread ( r , String . format ( "NettyClient_%d" , this . threadIndex . incrementAndGet ( ) ) ) ;
}
} ) ;
} else {
this . workerGroup = new NioEventLoopGroup ( clientConfig . getWorkerThreads ( ) , new ThreadFactory ( ) {
private AtomicInteger threadIndex = new AtomicInteger ( 0 ) ;
@Override
public Thread newThread ( Runnable r ) {
return new Thread ( r , String . format ( "NettyClient_%d" , this . threadIndex . incrementAndGet ( ) ) ) ;
}
} ) ;
}
this . callbackExecutor = new ThreadPoolExecutor ( 5 , 10 , 1 , TimeUnit . MINUTES ,
new LinkedBlockingQueue < > ( 1000 ) , new NamedThreadFactory ( "CallbackExecutor" , 10 ) ,
new CallerThreadExecutePolicy ( ) ) ;
new LinkedBlockingQueue < > ( 1000 ) , new NamedThreadFactory ( "CallbackExecutor" , 10 ) ,
new CallerThreadExecutePolicy ( ) ) ;
this . clientHandler = new NettyClientHandler ( this , callbackExecutor ) ;
this . responseFutureExecutor = Executors . newSingleThreadScheduledExecutor ( new NamedThreadFactory ( "ResponseFutureExecutor" ) ) ;
@ -128,26 +148,27 @@ public class NettyRemotingClient {
}
/ * *
* start
* start
* /
private void start ( ) {
private void start ( ) {
this . bootstrap
. group ( this . workerGroup )
. channel ( NioSocketChannel . class )
. option ( ChannelOption . SO_KEEPALIVE , clientConfig . isSoKeepalive ( ) )
. option ( ChannelOption . TCP_NODELAY , clientConfig . isTcpNoDelay ( ) )
. option ( ChannelOption . SO_SNDBUF , clientConfig . getSendBufferSize ( ) )
. option ( ChannelOption . SO_RCVBUF , clientConfig . getReceiveBufferSize ( ) )
. handler ( new ChannelInitializer < SocketChannel > ( ) {
@Override
public void initChannel ( SocketChannel ch ) throws Exception {
ch . pipeline ( ) . addLast (
new NettyDecoder ( ) ,
clientHandler ,
encoder ) ;
}
} ) ;
. group ( this . workerGroup )
. channel ( NettyUtils . getSocketChannelClass ( ) )
. option ( ChannelOption . SO_KEEPALIVE , clientConfig . isSoKeepalive ( ) )
. option ( ChannelOption . TCP_NODELAY , clientConfig . isTcpNoDelay ( ) )
. option ( ChannelOption . SO_SNDBUF , clientConfig . getSendBufferSize ( ) )
. option ( ChannelOption . SO_RCVBUF , clientConfig . getReceiveBufferSize ( ) )
. option ( ChannelOption . CONNECT_TIMEOUT_MILLIS , clientConfig . getConnectTimeoutMillis ( ) )
. handler ( new ChannelInitializer < SocketChannel > ( ) {
@Override
public void initChannel ( SocketChannel ch ) throws Exception {
ch . pipeline ( ) . addLast (
new NettyDecoder ( ) ,
clientHandler ,
encoder ) ;
}
} ) ;
this . responseFutureExecutor . scheduleAtFixedRate ( new Runnable ( ) {
@Override
public void run ( ) {
@ -159,10 +180,11 @@ public class NettyRemotingClient {
}
/ * *
* async send
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* async send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
@ -182,22 +204,22 @@ public class NettyRemotingClient {
* control concurrency number
* /
boolean acquired = this . asyncSemaphore . tryAcquire ( timeoutMillis , TimeUnit . MILLISECONDS ) ;
if ( acquired ) {
if ( acquired ) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore ( this . asyncSemaphore ) ;
/ * *
* response future
* /
final ResponseFuture responseFuture = new ResponseFuture ( opaque ,
timeoutMillis ,
invokeCallback ,
releaseSemaphore ) ;
timeoutMillis ,
invokeCallback ,
releaseSemaphore ) ;
try {
channel . writeAndFlush ( command ) . addListener ( new ChannelFutureListener ( ) {
channel . writeAndFlush ( command ) . addListener ( new ChannelFutureListener ( ) {
@Override
public void operationComplete ( ChannelFuture future ) throws Exception {
if ( future . isSuccess ( ) ) {
if ( future . isSuccess ( ) ) {
responseFuture . setSendOk ( true ) ;
return ;
} else {
@ -207,28 +229,29 @@ public class NettyRemotingClient {
responseFuture . putResponse ( null ) ;
try {
responseFuture . executeInvokeCallback ( ) ;
} catch ( Throwable ex ) {
} catch ( Throwable ex ) {
logger . error ( "execute callback error" , ex ) ;
} finally {
} finally {
responseFuture . release ( ) ;
}
}
} ) ;
} catch ( Throwable ex ) {
} catch ( Throwable ex ) {
responseFuture . release ( ) ;
throw new RemotingException ( String . format ( "send command to host: %s failed" , host ) , ex ) ;
}
} else {
} else {
String message = String . format ( "try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d" ,
timeoutMillis , asyncSemaphore . getQueueLength ( ) , asyncSemaphore . availablePermits ( ) ) ;
timeoutMillis , asyncSemaphore . getQueueLength ( ) , asyncSemaphore . availablePermits ( ) ) ;
throw new RemotingTooMuchRequestException ( message ) ;
}
}
/ * *
* sync send
* @param host host
* @param command command
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
@ -244,7 +267,7 @@ public class NettyRemotingClient {
channel . writeAndFlush ( command ) . addListener ( new ChannelFutureListener ( ) {
@Override
public void operationComplete ( ChannelFuture future ) throws Exception {
if ( future . isSuccess ( ) ) {
if ( future . isSuccess ( ) ) {
responseFuture . setSendOk ( true ) ;
return ;
} else {
@ -259,10 +282,10 @@ public class NettyRemotingClient {
* sync wait for result
* /
Command result = responseFuture . waitResponse ( ) ;
if ( result = = null ) {
if ( responseFuture . isSendOK ( ) ) {
if ( result = = null ) {
if ( responseFuture . isSendOK ( ) ) {
throw new RemotingTimeoutException ( host . toString ( ) , timeoutMillis , responseFuture . getCause ( ) ) ;
} else {
} else {
throw new RemotingException ( host . toString ( ) , responseFuture . getCause ( ) ) ;
}
}
@ -270,8 +293,9 @@ public class NettyRemotingClient {
}
/ * *
* send task
* @param host host
* send task
*
* @param host host
* @param command command
* @throws RemotingException
* /
@ -296,33 +320,35 @@ public class NettyRemotingClient {
}
/ * *
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param processor processor
* /
public void registerProcessor ( final CommandType commandType , final NettyRequestProcessor processor ) {
this . registerProcessor ( commandType , processor , null ) ;
}
/ * *
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
* @param processor processor
* @param executor thread executor
* /
public void registerProcessor ( final CommandType commandType , final NettyRequestProcessor processor , final ExecutorService executor ) {
this . clientHandler . registerProcessor ( commandType , processor , executor ) ;
}
/ * *
* get channel
* get channel
*
* @param host
* @return
* /
public Channel getChannel ( Host host ) {
Channel channel = channels . get ( host ) ;
if ( channel ! = null & & channel . isActive ( ) ) {
if ( channel ! = null & & channel . isActive ( ) ) {
return channel ;
}
return createChannel ( host , true ) ;
@ -330,17 +356,18 @@ public class NettyRemotingClient {
/ * *
* create channel
* @param host host
*
* @param host host
* @param isSync sync flag
* @return channel
* /
public Channel createChannel ( Host host , boolean isSync ) {
ChannelFuture future ;
try {
synchronized ( bootstrap ) {
synchronized ( bootstrap ) {
future = bootstrap . connect ( new InetSocketAddress ( host . getIp ( ) , host . getPort ( ) ) ) ;
}
if ( isSync ) {
if ( isSync ) {
future . sync ( ) ;
}
if ( future . isSuccess ( ) ) {
@ -358,16 +385,16 @@ public class NettyRemotingClient {
* close
* /
public void close ( ) {
if ( isStarted . compareAndSet ( true , false ) ) {
if ( isStarted . compareAndSet ( true , false ) ) {
try {
closeChannels ( ) ;
if ( workerGroup ! = null ) {
if ( workerGroup ! = null ) {
this . workerGroup . shutdownGracefully ( ) ;
}
if ( callbackExecutor ! = null ) {
if ( callbackExecutor ! = null ) {
this . callbackExecutor . shutdownNow ( ) ;
}
if ( this . responseFutureExecutor ! = null ) {
if ( this . responseFutureExecutor ! = null ) {
this . responseFutureExecutor . shutdownNow ( ) ;
}
} catch ( Exception ex ) {
@ -378,9 +405,9 @@ public class NettyRemotingClient {
}
/ * *
* close channels
* close channels
* /
private void closeChannels ( ) {
private void closeChannels ( ) {
for ( Channel channel : this . channels . values ( ) ) {
channel . close ( ) ;
}
@ -389,11 +416,12 @@ public class NettyRemotingClient {
/ * *
* close channel
*
* @param host host
* /
public void closeChannel ( Host host ) {
public void closeChannel ( Host host ) {
Channel channel = this . channels . remove ( host ) ;
if ( channel ! = null ) {
if ( channel ! = null ) {
channel . close ( ) ;
}
}