Browse Source

rpc protocol

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
4578079368
  1. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  2. 15
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java
  3. 24
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java
  4. 19
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java
  5. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java
  6. 18
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java
  7. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java
  8. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
  9. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java
  10. 9
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java
  11. 21
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -29,7 +29,9 @@ import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.bytebuddy.implementation.bind.annotation.AllArguments; import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin; import net.bytebuddy.implementation.bind.annotation.Origin;
@ -40,6 +42,7 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType;
*/ */
public class ConsumerInterceptor { public class ConsumerInterceptor {
private static final Logger logger = LoggerFactory.getLogger(ConsumerInterceptor.class);
private Host host; private Host host;
private NettyClient nettyClient = NettyClient.getInstance(); private NettyClient nettyClient = NettyClient.getInstance();
@ -64,7 +67,12 @@ public class ConsumerInterceptor {
RpcProtocol protocol = buildProtocol(request); RpcProtocol protocol = buildProtocol(request);
while (retries-- > 0) { while (retries-- > 0) {
RpcResponse rsp = nettyClient.sendMsg(host, protocol, async); RpcResponse rsp = null;
try {
rsp = nettyClient.sendMsg(host, protocol, async);
} catch (InterruptedException e) {
logger.warn("send msg error ", e);
}
//success //success
if (null != rsp && rsp.getStatus() == 0) { if (null != rsp && rsp.getStatus() == 0) {
return rsp.getResult(); return rsp.getResult();

15
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java

@ -43,37 +43,33 @@ public class NettyDecoder extends ByteToMessageDecoder {
@Override @Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < RpcProtocolConstants.HEADER_LENGTH) { if (byteBuf.readableBytes() < RpcProtocolConstants.HEADER_LENGTH) {
System.out.println("llllll 长度不够");
return; return;
} }
byteBuf.markReaderIndex(); byteBuf.markReaderIndex();
short magic = byteBuf.readShort(); short magic = byteBuf.readShort();
if (RpcProtocolConstants.MAGIC != magic) { if (RpcProtocolConstants.MAGIC != magic) {
throw new IllegalArgumentException("magic number is illegal, " + magic); throw new IllegalArgumentException("magic number is illegal, " + magic);
} }
byte eventType = byteBuf.readByte(); byte eventType = byteBuf.readByte();
byte version = byteBuf.readByte(); byte version = byteBuf.readByte();
byte serialization = byteBuf.readByte(); byte serialization = byteBuf.readByte();
byte state = byteBuf.readByte();
long requestId = byteBuf.readLong(); long requestId = byteBuf.readLong();
int dataLength = byteBuf.readInt(); int dataLength = byteBuf.readInt();
byte[] data = new byte[dataLength]; byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
RpcProtocol rpcProtocol = new RpcProtocol(); RpcProtocol rpcProtocol = new RpcProtocol();
MessageHeader header=new MessageHeader();
MessageHeader header = new MessageHeader();
header.setVersion(version); header.setVersion(version);
header.setSerialization(serialization); header.setSerialization(serialization);
header.setStatus(state);
header.setRequestId(requestId); header.setRequestId(requestId);
header.setEventType(eventType); header.setEventType(eventType);
header.setMsgLength(dataLength); header.setMsgLength(dataLength);
byteBuf.readBytes(data);
rpcProtocol.setMsgHeader(header); rpcProtocol.setMsgHeader(header);
if (eventType != EventType.HEARTBEAT.getType()) { if (eventType != EventType.HEARTBEAT.getType()) {
Serializer serializer = RpcSerializer.getSerializerByType(serialization); Serializer serializer = RpcSerializer.getSerializerByType(serialization);
@ -83,5 +79,4 @@ public class NettyDecoder extends ByteToMessageDecoder {
list.add(rpcProtocol); list.add(rpcProtocol);
} }
} }

24
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java

@ -17,10 +17,8 @@
package org.apache.dolphinscheduler.rpc.codec; package org.apache.dolphinscheduler.rpc.codec;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader; import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol; import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import org.apache.dolphinscheduler.rpc.serializer.Serializer; import org.apache.dolphinscheduler.rpc.serializer.Serializer;
@ -40,28 +38,20 @@ public class NettyEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
@Override @Override
protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception { protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception {
MessageHeader msgHeader = msg.getMsgHeader(); MessageHeader msgHeader = msg.getMsgHeader();
byteBuf.writeShort(msgHeader.getMagic()); byteBuf.writeShort(msgHeader.getMagic());
if(msgHeader.getEventType()== EventType.HEARTBEAT.getType()){
byteBuf.writeByte(EventType.HEARTBEAT.getType());
System.out.println("heart beat ");
return;
}
byteBuf.writeByte(msgHeader.getEventType()); byteBuf.writeByte(msgHeader.getEventType());
byteBuf.writeByte(msgHeader.getVersion()); byteBuf.writeByte(msgHeader.getVersion());
byteBuf.writeByte(msgHeader.getSerialization()); byteBuf.writeByte(msgHeader.getSerialization());
byteBuf.writeByte(msgHeader.getStatus());
byteBuf.writeLong(msgHeader.getRequestId()); byteBuf.writeLong(msgHeader.getRequestId());
byte[] data = new byte[0];
int msgLength = msgHeader.getMsgLength();
Serializer rpcSerializer = RpcSerializer.getSerializerByType(msgHeader.getSerialization()); Serializer rpcSerializer = RpcSerializer.getSerializerByType(msgHeader.getSerialization());
if (null != rpcSerializer) {
byte[] data = rpcSerializer.serialize(msg.getBody()); data = rpcSerializer.serialize(msg.getBody());
byteBuf.writeInt(data.length); msgLength = data.length;
}
byteBuf.writeInt(msgLength);
byteBuf.writeBytes(data); byteBuf.writeBytes(data);
} }
} }

19
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java

@ -37,8 +37,11 @@ public class RpcFuture implements Future<Object> {
private RpcRequest request; private RpcRequest request;
public RpcFuture(RpcRequest rpcRequest) { private long requestId;
public RpcFuture(RpcRequest rpcRequest,long requestId) {
this.request = rpcRequest; this.request = rpcRequest;
this.requestId=requestId;
} }
@Override @Override
@ -57,24 +60,24 @@ public class RpcFuture implements Future<Object> {
} }
@Override @Override
public RpcResponse get() throws InterruptedException, ExecutionException { public RpcResponse get() throws InterruptedException {
boolean success = latch.await(5, TimeUnit.SECONDS); boolean success = latch.await(5, TimeUnit.SECONDS);
/* if (!success) { if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() throw new RuntimeException("Timeout exception. Request id: " + this.requestId
+ ". Request class name: " + this.request.getClassName() + ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName()); + ". Request method: " + this.request.getMethodName());
}*/ }
return response; return response;
} }
@Override @Override
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean success = latch.await(timeout, unit); boolean success = latch.await(timeout, unit);
/* if (!success) { if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() throw new RuntimeException("Timeout exception. Request id: " + requestId
+ ". Request class name: " + this.request.getClassName() + ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName()); + ". Request method: " + this.request.getMethodName());
}*/ }
return response; return response;
} }

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java

@ -23,7 +23,6 @@ public enum EventType {
REQUEST((byte)2,"business request"), REQUEST((byte)2,"business request"),
RESPONSE((byte)3,"business response"); RESPONSE((byte)3,"business response");
private Byte type; private Byte type;
private String description; private String description;

18
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java

@ -19,17 +19,15 @@ package org.apache.dolphinscheduler.rpc.protocol;
public class MessageHeader { public class MessageHeader {
private byte version; private byte version = 1;
private byte eventType; private byte eventType;
private int msgLength; private int msgLength = 0;
private byte status; private long requestId = 0L;
private long requestId; private byte serialization = 0;
private byte serialization;
private short magic = RpcProtocolConstants.MAGIC; private short magic = RpcProtocolConstants.MAGIC;
@ -61,14 +59,6 @@ public class MessageHeader {
this.msgLength = msgLength; this.msgLength = msgLength;
} }
public byte getStatus() {
return status;
}
public void setStatus(byte status) {
this.status = status;
}
public long getRequestId() { public long getRequestId() {
return requestId; return requestId;
} }

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.rpc.protocol;
public class RpcProtocolConstants { public class RpcProtocolConstants {
public static final int HEADER_LENGTH = 18; public static final int HEADER_LENGTH = 17;
public static final short MAGIC = (short) 0xbabe; public static final short MAGIC = (short) 0xbabe;

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -192,7 +191,7 @@ public class NettyClient {
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> protocol, Boolean async) { public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> protocol, Boolean async) throws InterruptedException {
Channel channel = getChannel(host); Channel channel = getChannel(host);
assert channel != null; assert channel != null;
@ -200,9 +199,10 @@ public class NettyClient {
RpcRequestCache rpcRequestCache = new RpcRequestCache(); RpcRequestCache rpcRequestCache = new RpcRequestCache();
String serviceName = request.getClassName() + request.getMethodName(); String serviceName = request.getClassName() + request.getMethodName();
rpcRequestCache.setServiceName(serviceName); rpcRequestCache.setServiceName(serviceName);
long reqId = protocol.getMsgHeader().getRequestId();
RpcFuture future = null; RpcFuture future = null;
if (Boolean.FALSE.equals(async)) { if (Boolean.FALSE.equals(async)) {
future = new RpcFuture(request); future = new RpcFuture(request, reqId);
rpcRequestCache.setRpcFuture(future); rpcRequestCache.setRpcFuture(future);
} }
RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache); RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache);
@ -217,7 +217,7 @@ public class NettyClient {
try { try {
assert future != null; assert future != null;
result = future.get(); result = future.get();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException e) {
logger.error("send msg error,service name is {}", serviceName, e); logger.error("send msg error,service name is {}", serviceName, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java

@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.rpc.client.ConsumerConfig;
import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.rpc.common.RequestEventType;
import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
@ -58,7 +56,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("xxxxxxxx"+msg.getClass().getSimpleName());
RpcProtocol rpcProtocol = (RpcProtocol) msg; RpcProtocol rpcProtocol = (RpcProtocol) msg;
RpcResponse rsp = (RpcResponse) rpcProtocol.getBody(); RpcResponse rsp = (RpcResponse) rpcProtocol.getBody();
@ -82,7 +79,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
return; return;
} }
//async
if (rsp.getStatus() == 0) { if (rsp.getStatus() == 0) {
@ -105,10 +101,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
MessageHeader messageHeader = new MessageHeader(); MessageHeader messageHeader = new MessageHeader();
messageHeader.setEventType(EventType.HEARTBEAT.getType()); messageHeader.setEventType(EventType.HEARTBEAT.getType());
rpcProtocol.setMsgHeader(messageHeader); rpcProtocol.setMsgHeader(messageHeader);
rpcProtocol.setBody(new RpcRequest());
ctx.channel().writeAndFlush(rpcProtocol); ctx.channel().writeAndFlush(rpcProtocol);
logger.debug("send heart beat msg..."); logger.debug("send heart beat msg...");
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }

9
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.rpc.remote; package org.apache.dolphinscheduler.rpc.remote;
import org.apache.dolphinscheduler.rpc.common.RequestEventType;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
@ -55,22 +54,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("channel read"+msg.getClass().getSimpleName());
RpcProtocol<RpcRequest> rpcProtocol = (RpcProtocol<RpcRequest>) msg; RpcProtocol<RpcRequest> rpcProtocol = (RpcProtocol<RpcRequest>) msg;
if (rpcProtocol.getMsgHeader().getEventType() == EventType.HEARTBEAT.getType()) { if (rpcProtocol.getMsgHeader().getEventType() == EventType.HEARTBEAT.getType()) {
logger.info("heart beat");
return; return;
} }
threadPoolManager.addExecuteTask(() -> readHandler(ctx, rpcProtocol)); threadPoolManager.addExecuteTask(() -> readHandler(ctx, rpcProtocol));
} }
private void readHandler(ChannelHandlerContext ctx, RpcProtocol protocol) { private void readHandler(ChannelHandlerContext ctx, RpcProtocol protocol) {
RpcRequest req = (RpcRequest) protocol.getBody(); RpcRequest req = (RpcRequest) protocol.getBody();
RpcResponse response = new RpcResponse(); RpcResponse response = new RpcResponse();
// response.setRequestId(req.getRequestId());
response.setStatus((byte) 0); response.setStatus((byte) 0);
String classname = req.getClassName(); String classname = req.getClassName();
@ -90,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
result = method.invoke(object, arguments); result = method.invoke(object, arguments);
} catch (Exception e) { } catch (Exception e) {
logger.error("netty server execute error,service name {}", classname + methodName, e); logger.error("netty server execute error,service name :{} method name :{} ", classname + methodName, e);
response.setStatus((byte) -1); response.setStatus((byte) -1);
} }

21
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java

@ -1,4 +1,4 @@
package org.apache.dolphinscheduler.rpc;/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -15,18 +15,14 @@ package org.apache.dolphinscheduler.rpc;/*
* limitations under the License. * limitations under the License.
*/ */
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; package org.apache.dolphinscheduler.rpc;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.client.IRpcClient; import org.apache.dolphinscheduler.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.rpc.client.RpcClient; import org.apache.dolphinscheduler.rpc.client.RpcClient;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocolConstants;
import org.apache.dolphinscheduler.rpc.remote.NettyClient; import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.rpc.remote.NettyServer;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MainTest { public class MainTest {
@ -40,16 +36,7 @@ public class MainTest {
IRpcClient rpcClient = new RpcClient(); IRpcClient rpcClient = new RpcClient();
Host host = new Host("127.0.0.1", 12346); Host host = new Host("127.0.0.1", 12346);
IUserService userService = rpcClient.create(IUserService.class, host); IUserService userService = rpcClient.create(IUserService.class, host);
Integer result = userService.hi(3);
// Assert.assertSame(4, result);
result = userService.hi(4);
// Assert.assertSame(5, result);
// userService.say("sync");
// NettyClient nettyClient = NettyClient.getInstance();
// NettyClient.getInstance().close();
// nettyServer.close();
} }
public void sendTest() { public void sendTest() {
@ -58,8 +45,6 @@ public class MainTest {
result = userService.hi(4); result = userService.hi(4);
Assert.assertSame(5, result); Assert.assertSame(5, result);
userService.say("sync"); userService.say("sync");
NettyClient.getInstance().close(); NettyClient.getInstance().close();
} }

Loading…
Cancel
Save