Browse Source

rpc protocol

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
54ca216abf
  1. 18
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  2. 15
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java
  3. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java
  4. 43
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java
  5. 61
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java
  6. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
  7. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java

18
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -22,7 +22,11 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.rpc.base.Rpc; import org.apache.dolphinscheduler.rpc.base.Rpc;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.protocol.EventType;
import org.apache.dolphinscheduler.rpc.protocol.MessageHeader;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import org.apache.dolphinscheduler.rpc.remote.NettyClient; import org.apache.dolphinscheduler.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.UUID; import java.util.UUID;
@ -57,6 +61,8 @@ public class ConsumerInterceptor {
int retries = consumerConfig.getRetries(); int retries = consumerConfig.getRetries();
RpcProtocol protocol=buildProtocol(request);
while (retries-- > 0) { while (retries-- > 0) {
RpcResponse rsp = nettyClient.sendMsg(host, request, async); RpcResponse rsp = nettyClient.sendMsg(host, request, async);
//success //success
@ -71,7 +77,6 @@ public class ConsumerInterceptor {
private RpcRequest buildReq(Object[] args, Method method) { private RpcRequest buildReq(Object[] args, Method method) {
RpcRequest request = new RpcRequest(); RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getSimpleName()); request.setClassName(method.getDeclaringClass().getSimpleName());
request.setMethodName(method.getName()); request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes()); request.setParameterTypes(method.getParameterTypes());
@ -97,4 +102,15 @@ public class ConsumerInterceptor {
return consumerConfig; return consumerConfig;
} }
private RpcProtocol buildProtocol(RpcRequest req){
RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
MessageHeader header=new MessageHeader();
header.setRequestId(RpcRequestTable.getRequestId());
header.setEventType(EventType.REQUEST.getType());
header.setSerialization(RpcSerializer.PROTOSTUFF.getType());
protocol.setMsgHeader(header);
protocol.setBody(req);
return protocol;
}
} }

15
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.rpc.client; package org.apache.dolphinscheduler.rpc.client;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* RpcRequestTable * RpcRequestTable
@ -28,18 +29,24 @@ public class RpcRequestTable {
throw new IllegalStateException("Utility class"); throw new IllegalStateException("Utility class");
} }
private static ConcurrentHashMap<String, RpcRequestCache> requestMap = new ConcurrentHashMap<>(); private static AtomicLong requestIdGen = new AtomicLong(0);
public static void put(String requestId, RpcRequestCache rpcRequestCache) { private static ConcurrentHashMap<Long, RpcRequestCache> requestMap = new ConcurrentHashMap<>();
public static void put(long requestId, RpcRequestCache rpcRequestCache) {
requestMap.put(requestId, rpcRequestCache); requestMap.put(requestId, rpcRequestCache);
} }
public static RpcRequestCache get(String requestId) { public static RpcRequestCache get(Long requestId) {
return requestMap.get(requestId); return requestMap.get(requestId);
} }
public static void remove(String requestId) { public static void remove(Long requestId) {
requestMap.remove(requestId); requestMap.remove(requestId);
} }
public static long getRequestId() {
return requestIdGen.incrementAndGet();
}
} }

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java

@ -22,11 +22,11 @@ package org.apache.dolphinscheduler.rpc.common;
*/ */
public class RpcRequest { public class RpcRequest {
private String requestId;
private String className; private String className;
private String methodName; private String methodName;
private Class<?>[] parameterTypes; private Class<?>[] parameterTypes;
private Object[] parameters; private Object[] parameters;
/** /**
* @see RequestEventType * @see RequestEventType
*/ */
@ -42,14 +42,6 @@ public class RpcRequest {
this.eventType = eventType; this.eventType = eventType;
} }
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getClassName() { public String getClassName() {
return className; return className;
} }

43
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.protocol;
public enum EventType {
HEARTBEAT((byte)1,"heartbeat"),
REQUEST((byte)2,"business request"),
RESPONSE((byte)3,"business response");
private Byte type;
private String description;
EventType(Byte type, String description) {
this.type = type;
this.description = description;
}
public Byte getType() {
return type;
}
public String getDescription() {
return description;
}
}

61
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java

@ -21,55 +21,44 @@ public class MessageHeader {
private byte magic = (byte) 0xbabe; private byte magic = (byte) 0xbabe;
/** private byte version;
* context length
*/
private int contextLength;
/**
* context
*/
private byte[] context;
private String requestId; private byte eventType;
private int msgLength;
private byte type;
private byte status; private byte status;
private byte serialization; private long requestId;
public int getContextLength() { private byte serialization;
return contextLength;
}
public void setContextLength(int contextLength) { public byte getMagic() {
this.contextLength = contextLength; return magic;
} }
public byte[] getContext() { public byte getVersion() {
return context; return version;
} }
public void setContext(byte[] context) { public void setVersion(byte version) {
this.context = context; this.version = version;
} }
public String getRequestId() { public byte getEventType() {
return requestId; return eventType;
} }
public void setRequestId(String requestId) { public void setEventType(byte eventType) {
this.requestId = requestId; this.eventType = eventType;
} }
public byte getType() { public int getMsgLength() {
return type; return msgLength;
} }
public void setType(byte type) { public void setMsgLength(int msgLength) {
this.type = type; this.msgLength = msgLength;
} }
public byte getStatus() { public byte getStatus() {
@ -80,6 +69,14 @@ public class MessageHeader {
this.status = status; this.status = status;
} }
public long getRequestId() {
return requestId;
}
public void setRequestId(long requestId) {
this.requestId = requestId;
}
public byte getSerialization() { public byte getSerialization() {
return serialization; return serialization;
} }
@ -87,8 +84,4 @@ public class MessageHeader {
public void setSerialization(byte serialization) { public void setSerialization(byte serialization) {
this.serialization = serialization; this.serialization = serialization;
} }
public byte getMagic() {
return magic;
}
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.rpc.codec.NettyEncoder;
import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.rpc.future.RpcFuture; import org.apache.dolphinscheduler.rpc.future.RpcFuture;
import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -191,10 +192,11 @@ public class NettyClient {
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
public RpcResponse sendMsg(Host host, RpcRequest request, Boolean async) { public RpcResponse sendMsg(Host host, RpcProtocol<RpcRequest> protocol, Boolean async) {
Channel channel = getChannel(host); Channel channel = getChannel(host);
assert channel != null; assert channel != null;
RpcRequest request=protocol.getBody();
RpcRequestCache rpcRequestCache = new RpcRequestCache(); RpcRequestCache rpcRequestCache = new RpcRequestCache();
String serviceName = request.getClassName() + request.getMethodName(); String serviceName = request.getClassName() + request.getMethodName();
rpcRequestCache.setServiceName(serviceName); rpcRequestCache.setServiceName(serviceName);
@ -203,7 +205,7 @@ public class NettyClient {
future = new RpcFuture(request); future = new RpcFuture(request);
rpcRequestCache.setRpcFuture(future); rpcRequestCache.setRpcFuture(future);
} }
RpcRequestTable.put(request.getRequestId(), rpcRequestCache); RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache);
channel.writeAndFlush(request); channel.writeAndFlush(request);
RpcResponse result = null; RpcResponse result = null;

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java

@ -31,6 +31,10 @@ public enum RpcSerializer {
this.serializer = serializer; this.serializer = serializer;
} }
public byte getType() {
return type;
}
private static HashMap<Byte, Serializer> SERIALIZERS_MAP = new HashMap<>(); private static HashMap<Byte, Serializer> SERIALIZERS_MAP = new HashMap<>();
static { static {

Loading…
Cancel
Save