CalvinKirs
4 years ago
12 changed files with 608 additions and 30 deletions
@ -0,0 +1,28 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.client; |
||||
|
||||
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-14 10:42 |
||||
*/ |
||||
public class RpcRequestTable { |
||||
|
||||
// key: requestId value: RpcFuture
|
||||
private static ConcurrentHashMap<String, RpcFuture> processingRpc = new ConcurrentHashMap<>(); |
||||
|
||||
public static void put(String requestId,RpcFuture rpcFuture){ |
||||
processingRpc.put(requestId,rpcFuture); |
||||
} |
||||
|
||||
public static RpcFuture get(String requestId){ |
||||
return processingRpc.get(requestId); |
||||
} |
||||
|
||||
public static void remove(String requestId){ |
||||
processingRpc.remove(requestId); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,52 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.future; |
||||
|
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; |
||||
|
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.Future; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-14 09:24 |
||||
*/ |
||||
public class RpcFuture implements Future<Object> { |
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1); |
||||
|
||||
private RpcResponse response; |
||||
|
||||
@Override |
||||
public boolean cancel(boolean mayInterruptIfRunning) { |
||||
return false; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isCancelled() { |
||||
return false; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isDone() { |
||||
return false; |
||||
} |
||||
|
||||
@Override |
||||
public Object get() throws InterruptedException, ExecutionException { |
||||
boolean b = latch.await(5,TimeUnit.SECONDS); |
||||
return response.getResult(); |
||||
} |
||||
|
||||
@Override |
||||
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
||||
boolean b = latch.await(timeout,unit); |
||||
return response.getResult(); |
||||
} |
||||
|
||||
public void done(RpcResponse response){ |
||||
this.response = response; |
||||
latch.countDown(); |
||||
} |
||||
} |
@ -1,8 +0,0 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.remote; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-13 13:51 |
||||
*/ |
||||
public class NettyChannel { |
||||
} |
@ -0,0 +1,214 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.remote; |
||||
|
||||
import io.netty.bootstrap.Bootstrap; |
||||
import io.netty.buffer.Unpooled; |
||||
import io.netty.channel.Channel; |
||||
import io.netty.channel.ChannelFuture; |
||||
import io.netty.channel.ChannelInitializer; |
||||
import io.netty.channel.ChannelOption; |
||||
import io.netty.channel.EventLoopGroup; |
||||
import io.netty.channel.epoll.EpollEventLoopGroup; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.SocketChannel; |
||||
import io.netty.handler.logging.LogLevel; |
||||
import io.netty.handler.logging.LoggingHandler; |
||||
import io.netty.handler.timeout.IdleStateHandler; |
||||
import io.netty.util.CharsetUtil; |
||||
|
||||
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
|
||||
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; |
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture; |
||||
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; |
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; |
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; |
||||
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; |
||||
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; |
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.remote.utils.NettyUtils; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ThreadFactory; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-13 19:31 |
||||
*/ |
||||
public class NettyClient { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyClient.class); |
||||
|
||||
/** |
||||
* worker group |
||||
*/ |
||||
private final EventLoopGroup workerGroup; |
||||
|
||||
/** |
||||
* client config |
||||
*/ |
||||
private final NettyClientConfig clientConfig; |
||||
|
||||
|
||||
/** |
||||
* client bootstrap |
||||
*/ |
||||
private final Bootstrap bootstrap = new Bootstrap(); |
||||
|
||||
/** |
||||
* started flag |
||||
*/ |
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false); |
||||
|
||||
/** |
||||
* channels |
||||
*/ |
||||
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128); |
||||
|
||||
/** |
||||
* get channel |
||||
*/ |
||||
public Channel getChannel(Host host) { |
||||
Channel channel = channels.get(host); |
||||
if (channel != null && channel.isActive()) { |
||||
return channel; |
||||
} |
||||
return createChannel(host, true); |
||||
} |
||||
|
||||
/** |
||||
* create channel |
||||
* |
||||
* @param host host |
||||
* @param isSync sync flag |
||||
* @return channel |
||||
*/ |
||||
public Channel createChannel(Host host, boolean isSync) { |
||||
ChannelFuture future; |
||||
try { |
||||
synchronized (bootstrap) { |
||||
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); |
||||
} |
||||
if (isSync) { |
||||
future.sync(); |
||||
} |
||||
if (future.isSuccess()) { |
||||
Channel channel = future.channel(); |
||||
channels.put(host, channel); |
||||
return channel; |
||||
} |
||||
} catch (Exception ex) { |
||||
logger.warn(String.format("connect to %s error", host), ex); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* client init |
||||
* |
||||
* @param clientConfig client config |
||||
*/ |
||||
public NettyClient(final NettyClientConfig clientConfig) { |
||||
this.clientConfig = clientConfig; |
||||
if (NettyUtils.useEpoll()) { |
||||
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
} else { |
||||
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
} |
||||
this.start(); |
||||
|
||||
} |
||||
|
||||
/** |
||||
* start |
||||
*/ |
||||
private void start() { |
||||
|
||||
this.bootstrap |
||||
.group(this.workerGroup) |
||||
.channel(NettyUtils.getSocketChannelClass()) |
||||
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) |
||||
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) |
||||
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) |
||||
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) |
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) |
||||
.handler(new LoggingHandler(LogLevel.DEBUG)) |
||||
.handler(new ChannelInitializer<SocketChannel>() { |
||||
@Override |
||||
public void initChannel(SocketChannel ch) { |
||||
ch.pipeline() |
||||
.addLast(new NettyEncoder(RpcRequest.class)) //OUT - 1
|
||||
.addLast(new NettyDecoder(RpcResponse.class)) |
||||
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) |
||||
|
||||
.addLast(new NettyClientHandler()); |
||||
} |
||||
}); |
||||
|
||||
isStarted.compareAndSet(false, true); |
||||
System.out.println("netty client start"); |
||||
} |
||||
|
||||
public void sendMsg(Host host, RpcRequest request) { |
||||
Channel channel = getChannel(host); |
||||
assert channel != null; |
||||
// ctx.writeAndFlush(Unpooled.copiedBuffer
|
||||
RpcFuture future = new RpcFuture(); |
||||
RpcRequestTable.put(request.getRequestId(), future); |
||||
|
||||
channel.writeAndFlush(request); |
||||
// System.out.println();
|
||||
// channel.writeAndFlush( ProtoStuffUtils.serialize(request));
|
||||
|
||||
} |
||||
|
||||
|
||||
/** |
||||
* close |
||||
*/ |
||||
public void close() { |
||||
if (isStarted.compareAndSet(true, false)) { |
||||
try { |
||||
closeChannels(); |
||||
if (workerGroup != null) { |
||||
this.workerGroup.shutdownGracefully(); |
||||
} |
||||
} catch (Exception ex) { |
||||
logger.error("netty client close exception", ex); |
||||
} |
||||
logger.info("netty client closed"); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* close channels |
||||
*/ |
||||
private void closeChannels() { |
||||
for (Channel channel : this.channels.values()) { |
||||
channel.close(); |
||||
} |
||||
this.channels.clear(); |
||||
} |
||||
} |
@ -0,0 +1,187 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.remote; |
||||
|
||||
import io.netty.bootstrap.ServerBootstrap; |
||||
import io.netty.channel.ChannelFuture; |
||||
import io.netty.channel.ChannelInitializer; |
||||
import io.netty.channel.ChannelOption; |
||||
import io.netty.channel.EventLoopGroup; |
||||
import io.netty.channel.epoll.EpollEventLoopGroup; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.SocketChannel; |
||||
import io.netty.handler.logging.LogLevel; |
||||
import io.netty.handler.logging.LoggingHandler; |
||||
import io.netty.handler.timeout.IdleStateHandler; |
||||
|
||||
import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; |
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; |
||||
|
||||
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; |
||||
|
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; |
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; |
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
import org.apache.dolphinscheduler.remote.utils.NettyUtils; |
||||
|
||||
import java.util.concurrent.ThreadFactory; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-13 19:32 |
||||
*/ |
||||
public class NettyServer { |
||||
|
||||
private static final Logger logger =LoggerFactory.getLogger(NettyServer.class); |
||||
|
||||
/** |
||||
* boss group |
||||
*/ |
||||
private final EventLoopGroup bossGroup; |
||||
|
||||
/** |
||||
* worker group |
||||
*/ |
||||
private final EventLoopGroup workGroup; |
||||
|
||||
/** |
||||
* server config |
||||
*/ |
||||
private final NettyServerConfig serverConfig; |
||||
|
||||
/** |
||||
* server bootstrap |
||||
*/ |
||||
private final ServerBootstrap serverBootstrap = new ServerBootstrap(); |
||||
|
||||
/** |
||||
* started flag |
||||
*/ |
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false); |
||||
|
||||
/** |
||||
* server init |
||||
* |
||||
* @param serverConfig server config |
||||
*/ |
||||
public NettyServer(final NettyServerConfig serverConfig) { |
||||
this.serverConfig = serverConfig; |
||||
if (NettyUtils.useEpoll()) { |
||||
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
|
||||
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
} else { |
||||
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
|
||||
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
} |
||||
this.start(); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* server start |
||||
*/ |
||||
public void start() { |
||||
if (isStarted.compareAndSet(false, true)) { |
||||
this.serverBootstrap |
||||
.group(this.bossGroup, this.workGroup) |
||||
.channel(NettyUtils.getServerSocketChannelClass()) |
||||
.option(ChannelOption.SO_REUSEADDR, true) |
||||
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) |
||||
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) |
||||
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) |
||||
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) |
||||
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) |
||||
.handler(new LoggingHandler(LogLevel.DEBUG)) |
||||
.childHandler(new ChannelInitializer<SocketChannel>() { |
||||
|
||||
@Override |
||||
protected void initChannel(SocketChannel ch) throws Exception { |
||||
initNettyChannel(ch); |
||||
} |
||||
}); |
||||
|
||||
ChannelFuture future; |
||||
try { |
||||
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); |
||||
} catch (Exception e) { |
||||
//logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
|
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); |
||||
} |
||||
if (future.isSuccess()) { |
||||
// logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
|
||||
} else if (future.cause() != null) { |
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); |
||||
} else { |
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); |
||||
} |
||||
} |
||||
System.out.println("netty ser ver start"); |
||||
} |
||||
|
||||
/** |
||||
* init netty channel |
||||
* |
||||
* @param ch socket channel |
||||
*/ |
||||
private void initNettyChannel(SocketChannel ch) { |
||||
ch.pipeline() |
||||
.addLast(new NettyDecoder(RpcRequest.class)) |
||||
.addLast(new NettyEncoder(RpcResponse.class)) |
||||
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) |
||||
.addLast("handler", new NettyServerHandler()); |
||||
} |
||||
|
||||
|
||||
public void close() { |
||||
if (isStarted.compareAndSet(true, false)) { |
||||
try { |
||||
if (bossGroup != null) { |
||||
this.bossGroup.shutdownGracefully(); |
||||
} |
||||
if (workGroup != null) { |
||||
this.workGroup.shutdownGracefully(); |
||||
} |
||||
|
||||
} catch (Exception ex) { |
||||
logger.error("netty server close exception", ex); |
||||
} |
||||
logger.info("netty server closed"); |
||||
} |
||||
} |
||||
|
||||
|
||||
} |
@ -0,0 +1,61 @@
|
||||
package org.apache.dolphinscheduler.remote.rpc.remote; |
||||
|
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.channel.ChannelInboundHandlerAdapter; |
||||
import io.netty.handler.timeout.IdleStateEvent; |
||||
|
||||
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-13 19:20 |
||||
*/ |
||||
public class NettyServerHandler extends ChannelInboundHandlerAdapter { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); |
||||
@Override |
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { |
||||
super.channelRegistered(ctx); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void channelInactive(ChannelHandlerContext ctx){ |
||||
logger.info("channel close"); |
||||
ctx.channel().close(); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
||||
System.out.println("客户端连接成功!"+ctx.channel().remoteAddress()); |
||||
logger.info("客户端连接成功!"+ctx.channel().remoteAddress()); |
||||
} |
||||
|
||||
@Override |
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
||||
logger.info("server read msg"); |
||||
System.out.println("收到消息"); |
||||
RpcRequest req= (RpcRequest) msg; |
||||
System.out.println(req.getRequestId()); |
||||
} |
||||
|
||||
@Override |
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { |
||||
if (evt instanceof IdleStateEvent) { |
||||
ctx.channel().close(); |
||||
} else { |
||||
super.userEventTriggered(ctx, evt); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
||||
System.out.println("exceptionCaught"); |
||||
logger.error("exceptionCaught : {}", cause.getMessage(), cause); |
||||
ctx.channel().close(); |
||||
} |
||||
} |
Loading…
Reference in new issue