Browse Source

rpc protocol

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
392136cd24
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  2. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  3. 50
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java
  4. 36
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java
  5. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java
  6. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java
  7. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java
  8. 26
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java
  9. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
  10. 26
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java
  11. 26
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java
  12. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java
  13. 67
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java
  14. 26
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/Server.java
  15. 2
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -31,7 +31,7 @@ public class Constants {
public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000;
public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60;
public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 6;
/**
* charset

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -64,7 +64,7 @@ public class ConsumerInterceptor {
RpcProtocol protocol=buildProtocol(request);
while (retries-- > 0) {
RpcResponse rsp = nettyClient.sendMsg(host, request, async);
RpcResponse rsp = nettyClient.sendMsg(host, protocol, async);
//success
if (null != rsp && rsp.getStatus() == 0) {
return rsp.getResult();

50
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java

@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.rpc.codec;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocolConstants;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import org.apache.dolphinscheduler.rpc.serializer.Serializer;
@ -39,23 +43,45 @@ public class NettyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < 4) {
if (byteBuf.readableBytes() < RpcProtocolConstants.HEADER_LENGTH) {
System.out.println("llllll 长度不够");
return;
}
byteBuf.markReaderIndex();
int dataLength = byteBuf.readInt();
if (dataLength < 0) {
channelHandlerContext.close();
}
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
short magic = byteBuf.readShort();
if (RpcProtocolConstants.MAGIC != magic) {
throw new IllegalArgumentException("magic number is illegal, " + magic);
}
byte eventType = byteBuf.readByte();
byte version = byteBuf.readByte();
byte serialization=byteBuf.readByte();
byte serializerType = 1;
byte[] data = new byte[dataLength];
byte state = byteBuf.readByte();
long requestId=byteBuf.readLong();
int dataLength = byteBuf.readInt();
byte[] data=new byte[dataLength];
byteBuf.readBytes(data);
Serializer serializer = RpcSerializer.getSerializerByType(serializerType);
Object obj = serializer.deserialize(data, genericClass);
list.add(obj);
RpcProtocol rpcProtocol=new RpcProtocol();
MessageHeader header=new MessageHeader();
header.setVersion(version);
header.setSerialization(serialization);
header.setStatus(state);
header.setRequestId(requestId);
header.setEventType(eventType);
header.setMsgLength(dataLength);
rpcProtocol.setMsgHeader(header);
if(eventType!= EventType.HEARTBEAT.getType()){
Serializer serializer = RpcSerializer.getSerializerByType(serialization);
Object obj = serializer.deserialize(data, genericClass);
rpcProtocol.setBody(obj);
}
list.add(rpcProtocol);
}
}

36
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java

@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.rpc.codec;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import org.apache.dolphinscheduler.rpc.serializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -26,9 +31,7 @@ import io.netty.handler.codec.MessageToByteEncoder;
/**
* NettyEncoder
*/
public class NettyEncoder extends MessageToByteEncoder {
public class NettyEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
private Class<?> genericClass;
public NettyEncoder(Class<?> genericClass) {
@ -36,12 +39,29 @@ public class NettyEncoder extends MessageToByteEncoder {
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if (genericClass.isInstance(o)) {
byte[] data = ProtoStuffUtils.serialize(o);
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception {
MessageHeader msgHeader = msg.getMsgHeader();
byteBuf.writeShort(msgHeader.getMagic());
if(msgHeader.getEventType()== EventType.HEARTBEAT.getType()){
byteBuf.writeByte(EventType.HEARTBEAT.getType());
System.out.println("heart beat ");
return;
}
byteBuf.writeByte(msgHeader.getEventType());
byteBuf.writeByte(msgHeader.getVersion());
byteBuf.writeByte(msgHeader.getSerialization());
byteBuf.writeByte(msgHeader.getStatus());
byteBuf.writeLong(msgHeader.getRequestId());
Serializer rpcSerializer = RpcSerializer.getSerializerByType(msgHeader.getSerialization());
byte[] data = rpcSerializer.serialize(msg.getBody());
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java

@ -56,6 +56,7 @@ public class ServiceBean {
serviceMap.put(rpcService.value(), rpcClass);
logger.info("load rpc service {}", rpcService.value());
});
initialized.set(true);
}
public static Class getServiceClass(String className) {

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/future/RpcFuture.java

@ -59,22 +59,22 @@ public class RpcFuture implements Future<Object> {
@Override
public RpcResponse get() throws InterruptedException, ExecutionException {
boolean success = latch.await(5, TimeUnit.SECONDS);
if (!success) {
/* if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId()
+ ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName());
}
}*/
return response;
}
@Override
public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean success = latch.await(timeout, unit);
if (!success) {
/* if (!success) {
throw new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId()
+ ". Request class name: " + this.request.getClassName()
+ ". Request method: " + this.request.getMethodName());
}
}*/
return response;
}

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java

@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.rpc.protocol;
public class MessageHeader {
private byte magic = (byte) 0xbabe;
private byte version;
private byte eventType;
@ -33,7 +31,9 @@ public class MessageHeader {
private byte serialization;
public byte getMagic() {
private short magic = RpcProtocolConstants.MAGIC;
public short getMagic() {
return magic;
}

26
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocolConstants.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.protocol;
public class RpcProtocolConstants {
public static final int HEADER_LENGTH = 18;
public static final short MAGIC = (short) 0xbabe;
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -206,8 +206,7 @@ public class NettyClient {
rpcRequestCache.setRpcFuture(future);
}
RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache);
channel.writeAndFlush(request);
channel.writeAndFlush(protocol);
RpcResponse result = null;
if (Boolean.TRUE.equals(async)) {
result = new RpcResponse();

26
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java

@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import java.lang.reflect.InvocationTargetException;
@ -55,22 +58,26 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RpcResponse rsp = (RpcResponse) msg;
RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId());
System.out.println("xxxxxxxx"+msg.getClass().getSimpleName());
RpcProtocol rpcProtocol= (RpcProtocol) msg;
RpcResponse rsp = (RpcResponse) rpcProtocol.getBody();
long reqId=rpcProtocol.getMsgHeader().getRequestId();
RpcRequestCache rpcRequest = RpcRequestTable.get(reqId);
if (null == rpcRequest) {
logger.warn("rpc read error,this request does not exist");
return;
}
threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest));
threadPoolManager.addExecuteTask(() -> readHandler(rsp, rpcRequest,reqId));
}
private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) {
private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest,long reqId) {
String serviceName = rpcRequest.getServiceName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (Boolean.FALSE.equals(consumerConfig.getAsync())) {
RpcFuture future = rpcRequest.getRpcFuture();
RpcRequestTable.remove(rsp.getRequestId());
RpcRequestTable.remove(reqId);
future.done(rsp);
return;
@ -94,9 +101,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
RpcRequest request = new RpcRequest();
request.setEventType(RequestEventType.HEARTBEAT.getType());
ctx.channel().writeAndFlush(request);
RpcProtocol rpcProtocol=new RpcProtocol();
MessageHeader messageHeader=new MessageHeader();
messageHeader.setEventType(EventType.HEARTBEAT.getType());
rpcProtocol.setMsgHeader(messageHeader);
rpcProtocol.setBody(new RpcRequest());
ctx.channel().writeAndFlush(rpcProtocol);
logger.debug("send heart beat msg...");
} else {

26
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java

@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager;
import org.apache.dolphinscheduler.rpc.config.ServiceBean;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import java.lang.reflect.Method;
@ -54,20 +56,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RpcRequest req = (RpcRequest) msg;
if (req.getEventType().equals(RequestEventType.HEARTBEAT.getType())) {
logger.info("accept heartbeat msg");
System.out.println("channel read"+msg.getClass().getSimpleName());
RpcProtocol<RpcRequest> rpcProtocol= (RpcProtocol<RpcRequest>) msg;
if(rpcProtocol.getMsgHeader().getEventType()==EventType.HEARTBEAT.getType()){
return;
}
threadPoolManager.addExecuteTask(() -> readHandler(ctx, req));
threadPoolManager.addExecuteTask(() -> readHandler(ctx, rpcProtocol));
}
private void readHandler(ChannelHandlerContext ctx, RpcRequest req) {
private void readHandler(ChannelHandlerContext ctx, RpcProtocol protocol) {
RpcRequest req= (RpcRequest) protocol.getBody();
RpcResponse response = new RpcResponse();
response.setRequestId(req.getRequestId());
// response.setRequestId(req.getRequestId());
response.setStatus((byte) 0);
@ -93,13 +95,15 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
response.setResult(result);
ctx.writeAndFlush(response);
protocol.setBody(response);
protocol.getMsgHeader().setEventType(EventType.RESPONSE.getType());
ctx.writeAndFlush(protocol);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close();
logger.debug("IdleStateEvent triggered, send heartbeat to channel " + ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java

@ -15,7 +15,6 @@ package org.apache.dolphinscheduler.rpc.serializer;/*
* limitations under the License.
*/
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -36,7 +35,7 @@ public class ProtoStuffSerializer implements Serializer {
}
@Override
public <T> byte[] serialize(T obj) throws IOException {
public <T> byte[] serialize(T obj) {
Class<T> clazz = (Class<T>) obj.getClass();
Schema<T> schema = getSchema(clazz);
byte[] data;
@ -49,7 +48,7 @@ public class ProtoStuffSerializer implements Serializer {
}
@Override
public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
public <T> T deserialize(byte[] data, Class<T> clz) {
Schema<T> schema = getSchema(clz);
T obj = schema.newMessage();
if (null == obj) {

67
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/MainTest.java

@ -0,0 +1,67 @@
package org.apache.dolphinscheduler.rpc;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.rpc.client.RpcClient;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocolConstants;
import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.rpc.remote.NettyServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class MainTest {
private IUserService userService;
private Host host;
public static void main(String[] args) throws Exception {
IRpcClient rpcClient = new RpcClient();
Host host = new Host("127.0.0.1", 12346);
IUserService userService = rpcClient.create(IUserService.class, host);
Integer result = userService.hi(3);
// Assert.assertSame(4, result);
result = userService.hi(4);
// Assert.assertSame(5, result);
// userService.say("sync");
// NettyClient nettyClient = NettyClient.getInstance();
// NettyClient.getInstance().close();
// nettyServer.close();
}
public void sendTest() {
Integer result = userService.hi(3);
Assert.assertSame(4, result);
result = userService.hi(4);
Assert.assertSame(5, result);
userService.say("sync");
NettyClient.getInstance().close();
}
}

26
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/Server.java

@ -0,0 +1,26 @@
package org.apache.dolphinscheduler.rpc;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.rpc.remote.NettyServer;
public class Server {
public static void main(String[] args) {
NettyServer nettyServer=new NettyServer(new NettyServerConfig());
}
}

2
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/UserService.java

@ -32,6 +32,8 @@ public class UserService implements IUserService {
@Override
public Integer hi(int num) {
System.out.println("hihihihi+"+num);
return ++num;
}
}

Loading…
Cancel
Save