From 457807936883959f70dcc8230329311e707b33fe Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 1 Feb 2021 22:18:47 +0800 Subject: [PATCH] rpc protocol --- .../rpc/client/ConsumerInterceptor.java | 20 ++++++++++----- .../rpc/codec/NettyDecoder.java | 23 +++++++---------- .../rpc/codec/NettyEncoder.java | 24 ++++++------------ .../rpc/future/RpcFuture.java | 19 ++++++++------ .../rpc/protocol/EventType.java | 1 - .../rpc/protocol/MessageHeader.java | 20 ++++----------- .../rpc/protocol/RpcProtocolConstants.java | 2 +- .../rpc/remote/NettyClient.java | 10 ++++---- .../rpc/remote/NettyClientHandler.java | 18 +++++-------- .../rpc/remote/NettyServerHandler.java | 15 +++++------ .../apache/dolphinscheduler/rpc/MainTest.java | 25 ++++--------------- 11 files changed, 69 insertions(+), 108 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index cf0d2d240e..15281a3d2c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -29,7 +29,9 @@ import org.apache.dolphinscheduler.rpc.remote.NettyClient; import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; import java.lang.reflect.Method; -import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import net.bytebuddy.implementation.bind.annotation.AllArguments; import net.bytebuddy.implementation.bind.annotation.Origin; @@ -40,6 +42,7 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType; */ public class ConsumerInterceptor { + private static final Logger logger = LoggerFactory.getLogger(ConsumerInterceptor.class); private Host host; private NettyClient nettyClient = NettyClient.getInstance(); @@ -61,10 +64,15 @@ public class ConsumerInterceptor { int retries = consumerConfig.getRetries(); - RpcProtocol protocol=buildProtocol(request); + RpcProtocol protocol = buildProtocol(request); while (retries-- > 0) { - RpcResponse rsp = nettyClient.sendMsg(host, protocol, async); + RpcResponse rsp = null; + try { + rsp = nettyClient.sendMsg(host, protocol, async); + } catch (InterruptedException e) { + logger.warn("send msg error ", e); + } //success if (null != rsp && rsp.getStatus() == 0) { return rsp.getResult(); @@ -102,9 +110,9 @@ public class ConsumerInterceptor { return consumerConfig; } - private RpcProtocol buildProtocol(RpcRequest req){ - RpcProtocol protocol=new RpcProtocol<>(); - MessageHeader header=new MessageHeader(); + private RpcProtocol buildProtocol(RpcRequest req) { + RpcProtocol protocol = new RpcProtocol<>(); + MessageHeader header = new MessageHeader(); header.setRequestId(RpcRequestTable.getRequestId()); header.setEventType(EventType.REQUEST.getType()); header.setSerialization(RpcSerializer.PROTOSTUFF.getType()); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java index 9e27e5bc04..813bf719f2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java @@ -43,39 +43,35 @@ public class NettyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - - if (byteBuf.readableBytes() < RpcProtocolConstants.HEADER_LENGTH) { - System.out.println("llllll 长度不够"); return; } + byteBuf.markReaderIndex(); short magic = byteBuf.readShort(); + if (RpcProtocolConstants.MAGIC != magic) { throw new IllegalArgumentException("magic number is illegal, " + magic); } byte eventType = byteBuf.readByte(); byte version = byteBuf.readByte(); - byte serialization=byteBuf.readByte(); - - byte state = byteBuf.readByte(); - long requestId=byteBuf.readLong(); + byte serialization = byteBuf.readByte(); + long requestId = byteBuf.readLong(); int dataLength = byteBuf.readInt(); + byte[] data = new byte[dataLength]; - byte[] data=new byte[dataLength]; - byteBuf.readBytes(data); - RpcProtocol rpcProtocol=new RpcProtocol(); - MessageHeader header=new MessageHeader(); + RpcProtocol rpcProtocol = new RpcProtocol(); + MessageHeader header = new MessageHeader(); header.setVersion(version); header.setSerialization(serialization); - header.setStatus(state); header.setRequestId(requestId); header.setEventType(eventType); header.setMsgLength(dataLength); + byteBuf.readBytes(data); rpcProtocol.setMsgHeader(header); - if(eventType!= EventType.HEARTBEAT.getType()){ + if (eventType != EventType.HEARTBEAT.getType()) { Serializer serializer = RpcSerializer.getSerializerByType(serialization); Object obj = serializer.deserialize(data, genericClass); rpcProtocol.setBody(obj); @@ -83,5 +79,4 @@ public class NettyDecoder extends ByteToMessageDecoder { list.add(rpcProtocol); } - } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java index e28bc316d5..4f1fd53c2a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java @@ -17,10 +17,8 @@ package org.apache.dolphinscheduler.rpc.codec; -import org.apache.dolphinscheduler.rpc.protocol.EventType; import org.apache.dolphinscheduler.rpc.protocol.MessageHeader; import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol; -import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils; import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; import org.apache.dolphinscheduler.rpc.serializer.Serializer; @@ -40,28 +38,20 @@ public class NettyEncoder extends MessageToByteEncoder> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol msg, ByteBuf byteBuf) throws Exception { - - MessageHeader msgHeader = msg.getMsgHeader(); byteBuf.writeShort(msgHeader.getMagic()); - if(msgHeader.getEventType()== EventType.HEARTBEAT.getType()){ - byteBuf.writeByte(EventType.HEARTBEAT.getType()); - System.out.println("heart beat "); - return; - } - - byteBuf.writeByte(msgHeader.getEventType()); byteBuf.writeByte(msgHeader.getVersion()); byteBuf.writeByte(msgHeader.getSerialization()); - - byteBuf.writeByte(msgHeader.getStatus()); byteBuf.writeLong(msgHeader.getRequestId()); - + byte[] data = new byte[0]; + int msgLength = msgHeader.getMsgLength(); Serializer rpcSerializer = RpcSerializer.getSerializerByType(msgHeader.getSerialization()); - - byte[] data = rpcSerializer.serialize(msg.getBody()); - byteBuf.writeInt(data.length); + if (null != rpcSerializer) { + data = rpcSerializer.serialize(msg.getBody()); + msgLength = data.length; + } + byteBuf.writeInt(msgLength); byteBuf.writeBytes(data); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java index 603aaa089f..73ceb25404 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java @@ -37,8 +37,11 @@ public class RpcFuture implements Future { private RpcRequest request; - public RpcFuture(RpcRequest rpcRequest) { + private long requestId; + + public RpcFuture(RpcRequest rpcRequest,long requestId) { this.request = rpcRequest; + this.requestId=requestId; } @Override @@ -57,24 +60,24 @@ public class RpcFuture implements Future { } @Override - public RpcResponse get() throws InterruptedException, ExecutionException { + public RpcResponse get() throws InterruptedException { boolean success = latch.await(5, TimeUnit.SECONDS); - /* if (!success) { - throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() + if (!success) { + throw new RuntimeException("Timeout exception. Request id: " + this.requestId + ". Request class name: " + this.request.getClassName() + ". Request method: " + this.request.getMethodName()); - }*/ + } return response; } @Override public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { boolean success = latch.await(timeout, unit); - /* if (!success) { - throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() + if (!success) { + throw new RuntimeException("Timeout exception. Request id: " + requestId + ". Request class name: " + this.request.getClassName() + ". Request method: " + this.request.getMethodName()); - }*/ + } return response; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java index 7f9ce6a4c8..e6a85d3b8d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java @@ -23,7 +23,6 @@ public enum EventType { REQUEST((byte)2,"business request"), RESPONSE((byte)3,"business response"); - private Byte type; private String description; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java index 5a176d6240..d16b5883d3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java @@ -19,19 +19,17 @@ package org.apache.dolphinscheduler.rpc.protocol; public class MessageHeader { - private byte version; + private byte version = 1; private byte eventType; - private int msgLength; + private int msgLength = 0; - private byte status; + private long requestId = 0L; - private long requestId; + private byte serialization = 0; - private byte serialization; - - private short magic = RpcProtocolConstants.MAGIC; + private short magic = RpcProtocolConstants.MAGIC; public short getMagic() { return magic; @@ -61,14 +59,6 @@ public class MessageHeader { this.msgLength = msgLength; } - public byte getStatus() { - return status; - } - - public void setStatus(byte status) { - this.status = status; - } - public long getRequestId() { return requestId; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java index 549b29a993..8beee5b248 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.rpc.protocol; public class RpcProtocolConstants { - public static final int HEADER_LENGTH = 18; + public static final int HEADER_LENGTH = 17; public static final short MAGIC = (short) 0xbabe; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index 9a247d9b28..2df158e454 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -192,17 +191,18 @@ public class NettyClient { isStarted.compareAndSet(false, true); } - public RpcResponse sendMsg(Host host, RpcProtocol protocol, Boolean async) { + public RpcResponse sendMsg(Host host, RpcProtocol protocol, Boolean async) throws InterruptedException { Channel channel = getChannel(host); assert channel != null; - RpcRequest request=protocol.getBody(); + RpcRequest request = protocol.getBody(); RpcRequestCache rpcRequestCache = new RpcRequestCache(); String serviceName = request.getClassName() + request.getMethodName(); rpcRequestCache.setServiceName(serviceName); + long reqId = protocol.getMsgHeader().getRequestId(); RpcFuture future = null; if (Boolean.FALSE.equals(async)) { - future = new RpcFuture(request); + future = new RpcFuture(request, reqId); rpcRequestCache.setRpcFuture(future); } RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache); @@ -217,7 +217,7 @@ public class NettyClient { try { assert future != null; result = future.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { logger.error("send msg error,service name is {}", serviceName, e); Thread.currentThread().interrupt(); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java index e363fa80be..6bc61cff40 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java @@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.rpc.client.ConsumerConfig; import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; -import org.apache.dolphinscheduler.rpc.common.RequestEventType; -import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.future.RpcFuture; @@ -58,21 +56,20 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - System.out.println("xxxxxxxx"+msg.getClass().getSimpleName()); - RpcProtocol rpcProtocol= (RpcProtocol) msg; + RpcProtocol rpcProtocol = (RpcProtocol) msg; RpcResponse rsp = (RpcResponse) rpcProtocol.getBody(); - long reqId=rpcProtocol.getMsgHeader().getRequestId(); + long reqId = rpcProtocol.getMsgHeader().getRequestId(); RpcRequestCache rpcRequest = RpcRequestTable.get(reqId); if (null == rpcRequest) { logger.warn("rpc read error,this request does not exist"); return; } - threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest,reqId)); + threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest, reqId)); } - private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest,long reqId) { + private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest, long reqId) { String serviceName = rpcRequest.getServiceName(); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); if (Boolean.FALSE.equals(consumerConfig.getAsync())) { @@ -82,7 +79,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { return; } - //async if (rsp.getStatus() == 0) { @@ -101,14 +97,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - RpcProtocol rpcProtocol=new RpcProtocol(); - MessageHeader messageHeader=new MessageHeader(); + RpcProtocol rpcProtocol = new RpcProtocol(); + MessageHeader messageHeader = new MessageHeader(); messageHeader.setEventType(EventType.HEARTBEAT.getType()); rpcProtocol.setMsgHeader(messageHeader); - rpcProtocol.setBody(new RpcRequest()); ctx.channel().writeAndFlush(rpcProtocol); logger.debug("send heart beat msg..."); - } else { super.userEventTriggered(ctx, evt); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java index c19a29069d..9dd4315a16 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.rpc.remote; -import org.apache.dolphinscheduler.rpc.common.RequestEventType; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; @@ -55,22 +54,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } @Override + @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { - System.out.println("channel read"+msg.getClass().getSimpleName()); - RpcProtocol rpcProtocol= (RpcProtocol) msg; - if(rpcProtocol.getMsgHeader().getEventType()==EventType.HEARTBEAT.getType()){ + RpcProtocol rpcProtocol = (RpcProtocol) msg; + if (rpcProtocol.getMsgHeader().getEventType() == EventType.HEARTBEAT.getType()) { + logger.info("heart beat"); return; } threadPoolManager.addExecuteTask(() -> readHandler(ctx, rpcProtocol)); } private void readHandler(ChannelHandlerContext ctx, RpcProtocol protocol) { - - RpcRequest req= (RpcRequest) protocol.getBody(); + RpcRequest req = (RpcRequest) protocol.getBody(); RpcResponse response = new RpcResponse(); - // response.setRequestId(req.getRequestId()); - response.setStatus((byte) 0); String classname = req.getClassName(); @@ -90,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { result = method.invoke(object, arguments); } catch (Exception e) { - logger.error("netty server execute error,service name {}", classname + methodName, e); + logger.error("netty server execute error,service name :{} method name :{} ", classname + methodName, e); response.setStatus((byte) -1); } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java index 59528d0cf9..8a64f9988e 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.rpc;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,18 +15,14 @@ package org.apache.dolphinscheduler.rpc;/* * limitations under the License. */ -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +package org.apache.dolphinscheduler.rpc; + import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.rpc.client.IRpcClient; import org.apache.dolphinscheduler.rpc.client.RpcClient; -import org.apache.dolphinscheduler.rpc.protocol.RpcProtocolConstants; import org.apache.dolphinscheduler.rpc.remote.NettyClient; -import org.apache.dolphinscheduler.rpc.remote.NettyServer; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; public class MainTest { @@ -38,18 +34,9 @@ public class MainTest { public static void main(String[] args) throws Exception { IRpcClient rpcClient = new RpcClient(); - Host host = new Host("127.0.0.1", 12346); - IUserService userService = rpcClient.create(IUserService.class, host); - Integer result = userService.hi(3); - // Assert.assertSame(4, result); - result = userService.hi(4); - // Assert.assertSame(5, result); - // userService.say("sync"); - + Host host = new Host("127.0.0.1", 12346); + IUserService userService = rpcClient.create(IUserService.class, host); - // NettyClient nettyClient = NettyClient.getInstance(); - // NettyClient.getInstance().close(); - // nettyServer.close(); } public void sendTest() { @@ -58,8 +45,6 @@ public class MainTest { result = userService.hi(4); Assert.assertSame(5, result); userService.say("sync"); - - NettyClient.getInstance().close(); }