diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index 0f155e07c7..0ae31c9cbc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -22,7 +22,11 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.rpc.base.Rpc; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.rpc.protocol.EventType; +import org.apache.dolphinscheduler.rpc.protocol.MessageHeader; +import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol; import org.apache.dolphinscheduler.rpc.remote.NettyClient; +import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer; import java.lang.reflect.Method; import java.util.UUID; @@ -57,6 +61,8 @@ public class ConsumerInterceptor { int retries = consumerConfig.getRetries(); + RpcProtocol protocol=buildProtocol(request); + while (retries-- > 0) { RpcResponse rsp = nettyClient.sendMsg(host, request, async); //success @@ -71,7 +77,6 @@ public class ConsumerInterceptor { private RpcRequest buildReq(Object[] args, Method method) { RpcRequest request = new RpcRequest(); - request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getSimpleName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); @@ -97,4 +102,15 @@ public class ConsumerInterceptor { return consumerConfig; } + private RpcProtocol buildProtocol(RpcRequest req){ + RpcProtocol protocol=new RpcProtocol<>(); + MessageHeader header=new MessageHeader(); + header.setRequestId(RpcRequestTable.getRequestId()); + header.setEventType(EventType.REQUEST.getType()); + header.setSerialization(RpcSerializer.PROTOSTUFF.getType()); + protocol.setMsgHeader(header); + protocol.setBody(req); + return protocol; + } + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java index 0a62a15447..4d47522698 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.rpc.client; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; /** * RpcRequestTable @@ -28,18 +29,24 @@ public class RpcRequestTable { throw new IllegalStateException("Utility class"); } - private static ConcurrentHashMap requestMap = new ConcurrentHashMap<>(); + private static AtomicLong requestIdGen = new AtomicLong(0); - public static void put(String requestId, RpcRequestCache rpcRequestCache) { + private static ConcurrentHashMap requestMap = new ConcurrentHashMap<>(); + + public static void put(long requestId, RpcRequestCache rpcRequestCache) { requestMap.put(requestId, rpcRequestCache); } - public static RpcRequestCache get(String requestId) { + public static RpcRequestCache get(Long requestId) { return requestMap.get(requestId); } - public static void remove(String requestId) { + public static void remove(Long requestId) { requestMap.remove(requestId); } + public static long getRequestId() { + return requestIdGen.incrementAndGet(); + } + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java index 9f862d3565..877d40fd0e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/RpcRequest.java @@ -22,11 +22,11 @@ package org.apache.dolphinscheduler.rpc.common; */ public class RpcRequest { - private String requestId; private String className; private String methodName; private Class[] parameterTypes; private Object[] parameters; + /** * @see RequestEventType */ @@ -42,14 +42,6 @@ public class RpcRequest { this.eventType = eventType; } - public String getRequestId() { - return requestId; - } - - public void setRequestId(String requestId) { - this.requestId = requestId; - } - public String getClassName() { return className; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java new file mode 100644 index 0000000000..7f9ce6a4c8 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/EventType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.rpc.protocol; + +public enum EventType { + + HEARTBEAT((byte)1,"heartbeat"), + REQUEST((byte)2,"business request"), + RESPONSE((byte)3,"business response"); + + + private Byte type; + + private String description; + + EventType(Byte type, String description) { + this.type = type; + this.description = description; + } + + public Byte getType() { + return type; + } + + public String getDescription() { + return description; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java index 4d1168aa66..858113ac7a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java @@ -21,55 +21,44 @@ public class MessageHeader { private byte magic = (byte) 0xbabe; - /** - * context length - */ - private int contextLength; + private byte version; - /** - * context - */ - private byte[] context; + private byte eventType; - private String requestId; - - - private byte type; + private int msgLength; private byte status; - private byte serialization; + private long requestId; - public int getContextLength() { - return contextLength; - } + private byte serialization; - public void setContextLength(int contextLength) { - this.contextLength = contextLength; + public byte getMagic() { + return magic; } - public byte[] getContext() { - return context; + public byte getVersion() { + return version; } - public void setContext(byte[] context) { - this.context = context; + public void setVersion(byte version) { + this.version = version; } - public String getRequestId() { - return requestId; + public byte getEventType() { + return eventType; } - public void setRequestId(String requestId) { - this.requestId = requestId; + public void setEventType(byte eventType) { + this.eventType = eventType; } - public byte getType() { - return type; + public int getMsgLength() { + return msgLength; } - public void setType(byte type) { - this.type = type; + public void setMsgLength(int msgLength) { + this.msgLength = msgLength; } public byte getStatus() { @@ -80,6 +69,14 @@ public class MessageHeader { this.status = status; } + public long getRequestId() { + return requestId; + } + + public void setRequestId(long requestId) { + this.requestId = requestId; + } + public byte getSerialization() { return serialization; } @@ -87,8 +84,4 @@ public class MessageHeader { public void setSerialization(byte serialization) { this.serialization = serialization; } - - public byte getMagic() { - return magic; - } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index caabe929f7..9e35b5c0c1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.rpc.codec.NettyEncoder; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.future.RpcFuture; +import org.apache.dolphinscheduler.rpc.protocol.RpcProtocol; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @@ -191,10 +192,11 @@ public class NettyClient { isStarted.compareAndSet(false, true); } - public RpcResponse sendMsg(Host host, RpcRequest request, Boolean async) { + public RpcResponse sendMsg(Host host, RpcProtocol protocol, Boolean async) { Channel channel = getChannel(host); assert channel != null; + RpcRequest request=protocol.getBody(); RpcRequestCache rpcRequestCache = new RpcRequestCache(); String serviceName = request.getClassName() + request.getMethodName(); rpcRequestCache.setServiceName(serviceName); @@ -203,7 +205,7 @@ public class NettyClient { future = new RpcFuture(request); rpcRequestCache.setRpcFuture(future); } - RpcRequestTable.put(request.getRequestId(), rpcRequestCache); + RpcRequestTable.put(protocol.getMsgHeader().getRequestId(), rpcRequestCache); channel.writeAndFlush(request); RpcResponse result = null; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java index 1ea4ab80be..5c29d774a9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java @@ -31,6 +31,10 @@ public enum RpcSerializer { this.serializer = serializer; } + public byte getType() { + return type; + } + private static HashMap SERIALIZERS_MAP = new HashMap<>(); static {