From 74a4de43b9b4ac1c0ad7d38fd9f4fc829848315c Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Thu, 14 Jan 2021 10:57:41 +0800 Subject: [PATCH] rpc --- .../remote/config/NettyServerConfig.java | 2 +- .../remote/decoder/NettyDecoder.java | 1 + .../remote/decoder/NettyEncoder.java | 6 +- .../dolphinscheduler/remote/rpc/MainTest.java | 24 +- .../remote/rpc/client/RpcRequestTable.java | 28 +++ .../remote/rpc/filter/SelectorFilter.java | 3 +- .../remote/rpc/future/RpcFuture.java | 52 +++++ .../remote/rpc/remote/NettyChannel.java | 8 - .../remote/rpc/remote/NettyClient.java | 214 ++++++++++++++++++ .../remote/rpc/remote/NettyClientHandler.java | 52 ++++- .../remote/rpc/remote/NettyServer.java | 187 +++++++++++++++ .../remote/rpc/remote/NettyServerHandler.java | 61 +++++ 12 files changed, 608 insertions(+), 30 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyChannel.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java index 4ec8a0f7a7..cdee158d7c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java @@ -56,7 +56,7 @@ public class NettyServerConfig { /** * listen port */ - private int listenPort = 12346; + private int listenPort = 12366; public int getListenPort() { return listenPort; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java index f212b77389..e8b31e9b7c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java @@ -23,6 +23,7 @@ public class NettyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + System.out.println("decoder"); if (byteBuf.readableBytes() < 4) { return; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java index 3ba5781fb4..7a4dc6dc27 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java @@ -4,13 +4,14 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; /** * @author jiangli * @date 2021-01-12 18:52 */ -public class NettyEncoder extends MessageToByteEncoder { +public class NettyEncoder extends MessageToByteEncoder { private Class genericClass; @@ -21,12 +22,13 @@ public class NettyEncoder extends MessageToByteEncoder { } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { - + System.out.println("encsss"); if (genericClass.isInstance(o)) { byte[] data = ProtoStuffUtils.serialize(o); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index fbf499eb48..204d9d4150 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -1,6 +1,12 @@ package org.apache.dolphinscheduler.remote.rpc; -import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; + +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; +import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; +import org.apache.dolphinscheduler.remote.utils.Host; /** * @author jiangli @@ -9,12 +15,18 @@ import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; public class MainTest { public static void main(String[] args) throws Exception { + NettyServer nettyServer=new NettyServer(new NettyServerConfig()); + + NettyClient nettyClient=new NettyClient(new NettyClientConfig()); + + Host host=new Host("127.0.0.1",12366); + RpcRequest rpcRequest=new RpcRequest(); + rpcRequest.setRequestId("988"); + rpcRequest.setClassName("kris"); + rpcRequest.setMethodName("ll"); + - RpcClient rpcClient = new RpcClient(); - IUserService userService = rpcClient.create(IUserService.class); - for (int i = 0; i < 100; i++) { - userService.say(); - } + nettyClient.sendMsg(host,rpcRequest); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java new file mode 100644 index 0000000000..677a2997e0 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java @@ -0,0 +1,28 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author jiangli + * @date 2021-01-14 10:42 + */ +public class RpcRequestTable { + + // key: requestId value: RpcFuture + private static ConcurrentHashMap processingRpc = new ConcurrentHashMap<>(); + + public static void put(String requestId,RpcFuture rpcFuture){ + processingRpc.put(requestId,rpcFuture); + } + + public static RpcFuture get(String requestId){ + return processingRpc.get(requestId); + } + + public static void remove(String requestId){ + processingRpc.remove(requestId); + } + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java index 30aa32e9f2..8e8214a59f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java @@ -3,9 +3,10 @@ package org.apache.dolphinscheduler.remote.rpc.filter; import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.rpc.directory.Directory; import org.apache.dolphinscheduler.remote.rpc.selector.RandomSelector; import org.apache.dolphinscheduler.remote.utils.Host; -im + import java.util.ArrayList; import java.util.List; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java new file mode 100644 index 0000000000..516e05b917 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java @@ -0,0 +1,52 @@ +package org.apache.dolphinscheduler.remote.rpc.future; + +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author jiangli + * @date 2021-01-14 09:24 + */ +public class RpcFuture implements Future { + + private CountDownLatch latch = new CountDownLatch(1); + + private RpcResponse response; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + boolean b = latch.await(5,TimeUnit.SECONDS); + return response.getResult(); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + boolean b = latch.await(timeout,unit); + return response.getResult(); + } + + public void done(RpcResponse response){ + this.response = response; + latch.countDown(); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyChannel.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyChannel.java deleted file mode 100644 index a38f7df4a5..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyChannel.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.remote; - -/** - * @author jiangli - * @date 2021-01-13 13:51 - */ -public class NettyChannel { -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java new file mode 100644 index 0000000000..a4c74fdc41 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java @@ -0,0 +1,214 @@ +package org.apache.dolphinscheduler.remote.rpc.remote; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; + +import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; + +import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; +import org.apache.dolphinscheduler.remote.future.ResponseFuture; +import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; +import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; + +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author jiangli + * @date 2021-01-13 19:31 + */ +public class NettyClient { + + private final Logger logger = LoggerFactory.getLogger(NettyClient.class); + + /** + * worker group + */ + private final EventLoopGroup workerGroup; + + /** + * client config + */ + private final NettyClientConfig clientConfig; + + + /** + * client bootstrap + */ + private final Bootstrap bootstrap = new Bootstrap(); + + /** + * started flag + */ + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + /** + * channels + */ + private final ConcurrentHashMap channels = new ConcurrentHashMap(128); + + /** + * get channel + */ + public Channel getChannel(Host host) { + Channel channel = channels.get(host); + if (channel != null && channel.isActive()) { + return channel; + } + return createChannel(host, true); + } + + /** + * create channel + * + * @param host host + * @param isSync sync flag + * @return channel + */ + public Channel createChannel(Host host, boolean isSync) { + ChannelFuture future; + try { + synchronized (bootstrap) { + future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); + } + if (isSync) { + future.sync(); + } + if (future.isSuccess()) { + Channel channel = future.channel(); + channels.put(host, channel); + return channel; + } + } catch (Exception ex) { + logger.warn(String.format("connect to %s error", host), ex); + } + return null; + } + + /** + * client init + * + * @param clientConfig client config + */ + public NettyClient(final NettyClientConfig clientConfig) { + this.clientConfig = clientConfig; + if (NettyUtils.useEpoll()) { + this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } + this.start(); + + } + + /** + * start + */ + private void start() { + + this.bootstrap + .group(this.workerGroup) + .channel(NettyUtils.getSocketChannelClass()) + .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) + .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) + .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) + .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast(new NettyEncoder(RpcRequest.class)) //OUT - 1 + .addLast(new NettyDecoder(RpcResponse.class)) + .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) + + .addLast(new NettyClientHandler()); + } + }); + + isStarted.compareAndSet(false, true); + System.out.println("netty client start"); + } + + public void sendMsg(Host host, RpcRequest request) { + Channel channel = getChannel(host); + assert channel != null; + // ctx.writeAndFlush(Unpooled.copiedBuffer + RpcFuture future = new RpcFuture(); + RpcRequestTable.put(request.getRequestId(), future); + + channel.writeAndFlush(request); + // System.out.println(); + // channel.writeAndFlush( ProtoStuffUtils.serialize(request)); + + } + + + /** + * close + */ + public void close() { + if (isStarted.compareAndSet(true, false)) { + try { + closeChannels(); + if (workerGroup != null) { + this.workerGroup.shutdownGracefully(); + } + } catch (Exception ex) { + logger.error("netty client close exception", ex); + } + logger.info("netty client closed"); + } + } + + /** + * close channels + */ + private void closeChannels() { + for (Channel channel : this.channels.values()) { + channel.close(); + } + this.channels.clear(); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index 76b5f313b5..4708c1096a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -1,14 +1,24 @@ package org.apache.dolphinscheduler.remote.rpc.remote; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; -import org.apache.dolphinscheduler.remote.utils.ChannelUtils; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import java.net.InetSocketAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * @author jiangli * @date 2021-01-13 13:33 @@ -17,31 +27,49 @@ import java.net.InetSocketAddress; public class NettyClientHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - super.channelRegistered(ctx); - } + private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) { - ctx.channel().close(); + // ctx.channel().close(); } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress(); + public void channelInactive(ChannelHandlerContext ctx) { + System.out.println("client 关闭channel"); + InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); ctx.channel().close(); - //todo connectManage.removeChannel(ctx.channel()); + //todo connectManage.removeChannel(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - super.channelRead(ctx, msg); + System.out.println("收到消息"); + RpcResponse rsp = (RpcResponse) msg; + System.out.println(rsp); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - super.userEventTriggered(ctx, evt); + + if (evt instanceof IdleStateEvent){ + IdleStateEvent event = (IdleStateEvent)evt; + if (event.state()== IdleState.ALL_IDLE){ + RpcRequest request = new RpcRequest(); + request.setMethodName("heartBeat"); + ctx.channel().writeAndFlush(request); + logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息..."); + } + }else{ + super.userEventTriggered(ctx,evt); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + System.out.println("exceptionCaught"); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); + ctx.channel().close(); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java new file mode 100644 index 0000000000..41495f11c8 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java @@ -0,0 +1,187 @@ +package org.apache.dolphinscheduler.remote.rpc.remote; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; + +import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; + +import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; + +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author jiangli + * @date 2021-01-13 19:32 + */ +public class NettyServer { + + private static final Logger logger =LoggerFactory.getLogger(NettyServer.class); + + /** + * boss group + */ + private final EventLoopGroup bossGroup; + + /** + * worker group + */ + private final EventLoopGroup workGroup; + + /** + * server config + */ + private final NettyServerConfig serverConfig; + + /** + * server bootstrap + */ + private final ServerBootstrap serverBootstrap = new ServerBootstrap(); + + /** + * started flag + */ + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + /** + * server init + * + * @param serverConfig server config + */ + public NettyServer(final NettyServerConfig serverConfig) { + this.serverConfig = serverConfig; + if (NettyUtils.useEpoll()) { + this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); + + this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); + + this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); + } + this.start(); + } + + + /** + * server start + */ + public void start() { + if (isStarted.compareAndSet(false, true)) { + this.serverBootstrap + .group(this.bossGroup, this.workGroup) + .channel(NettyUtils.getServerSocketChannelClass()) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) + .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) + .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) + .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) + .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + initNettyChannel(ch); + } + }); + + ChannelFuture future; + try { + future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); + } catch (Exception e) { + //logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } + if (future.isSuccess()) { + // logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); + } else if (future.cause() != null) { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); + } else { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } + } + System.out.println("netty ser ver start"); + } + + /** + * init netty channel + * + * @param ch socket channel + */ + private void initNettyChannel(SocketChannel ch) { + ch.pipeline() + .addLast(new NettyDecoder(RpcRequest.class)) + .addLast(new NettyEncoder(RpcResponse.class)) + .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) + .addLast("handler", new NettyServerHandler()); + } + + + public void close() { + if (isStarted.compareAndSet(true, false)) { + try { + if (bossGroup != null) { + this.bossGroup.shutdownGracefully(); + } + if (workGroup != null) { + this.workGroup.shutdownGracefully(); + } + + } catch (Exception ex) { + logger.error("netty server close exception", ex); + } + logger.info("netty server closed"); + } + } + + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java new file mode 100644 index 0000000000..9ee9ff2197 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -0,0 +1,61 @@ +package org.apache.dolphinscheduler.remote.rpc.remote; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author jiangli + * @date 2021-01-13 19:20 + */ +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + super.channelRegistered(ctx); + } + + + @Override + public void channelInactive(ChannelHandlerContext ctx){ + logger.info("channel close"); + ctx.channel().close(); + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("客户端连接成功!"+ctx.channel().remoteAddress()); + logger.info("客户端连接成功!"+ctx.channel().remoteAddress()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + logger.info("server read msg"); + System.out.println("收到消息"); + RpcRequest req= (RpcRequest) msg; + System.out.println(req.getRequestId()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + System.out.println("exceptionCaught"); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); + ctx.channel().close(); + } +}