Browse Source

add rpc protocol

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
0f6228457a
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java
  2. 9
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java
  4. 95
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java
  5. 41
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocol.java
  6. 62
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java
  7. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffUtils.java
  8. 45
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java
  9. 28
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/Serializer.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java

@ -40,7 +40,7 @@ public class ConsumerInterceptor {
private NettyClient nettyClient = NettyClient.getInstance();
public ConsumerInterceptor(Host host) {
ConsumerInterceptor(Host host) {
this.host = host;
}

9
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyDecoder.java

@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.rpc.codec;
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils;
import org.apache.dolphinscheduler.rpc.serializer.RpcSerializer;
import org.apache.dolphinscheduler.rpc.serializer.Serializer;
import java.util.List;
@ -49,9 +51,12 @@ public class NettyDecoder extends ByteToMessageDecoder {
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
}
byte serializerType=1;
byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
Object obj = ProtoStuffUtils.deserialize(data, genericClass);
Serializer serializer=RpcSerializer.getSerializerByType(serializerType);
Object obj = serializer.deserialize(data, genericClass);
list.add(obj);
}
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/codec/NettyEncoder.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.rpc.codec;
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
import org.apache.dolphinscheduler.rpc.serializer.ProtoStuffUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

95
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/MessageHeader.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.protocol;
public class MessageHeader {
private byte magic=(byte) 0xbabe;
/**
* context length
*/
private int contextLength;
/**
* context
*/
private byte[] context;
private String requestId;
private byte type;
private byte status;
private byte serialization;
public int getContextLength() {
return contextLength;
}
public void setContextLength(int contextLength) {
this.contextLength = contextLength;
}
public byte[] getContext() {
return context;
}
public void setContext(byte[] context) {
this.context = context;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getStatus() {
return status;
}
public void setStatus(byte status) {
this.status = status;
}
public byte getSerialization() {
return serialization;
}
public void setSerialization(byte serialization) {
this.serialization = serialization;
}
public byte getMagic() {
return magic;
}
}

41
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/protocol/RpcProtocol.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.protocol;
public class RpcProtocol<T>{
private MessageHeader msgHeader;
private T body;
public MessageHeader getMsgHeader() {
return msgHeader;
}
public void setMsgHeader(MessageHeader msgHeader) {
this.msgHeader = msgHeader;
}
public T getBody() {
return body;
}
public void setBody(T body) {
this.body = body;
}
}

62
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffSerializer.java

@ -0,0 +1,62 @@
package org.apache.dolphinscheduler.rpc.serializer;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
public class ProtoStuffSerializer implements Serializer{
private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> clazz) {
return (Schema<T>) schemaCache.computeIfAbsent(clazz, RuntimeSchema::createFrom);
}
@Override
public <T> byte[] serialize(T obj) throws IOException {
Class<T> clazz = (Class<T>) obj.getClass();
Schema<T> schema = getSchema(clazz);
byte[] data;
try {
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
return data;
}
@Override
public <T> T deserialize(byte[] data, Class<T> clz) throws IOException {
Schema<T> schema = getSchema(clz);
T obj = schema.newMessage();
if (null == obj) {
return null;
}
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
}
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/ProtoStuffUtils.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.serialize;
package org.apache.dolphinscheduler.rpc.serializer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

45
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/RpcSerializer.java

@ -0,0 +1,45 @@
package org.apache.dolphinscheduler.rpc.serializer;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.HashMap;
public enum RpcSerializer {
PROTOSTUFF((byte) 1, new ProtoStuffSerializer());
byte type;
Serializer serializer;
RpcSerializer(byte type, Serializer serializer) {
this.type = type;
this.serializer = serializer;
}
private static HashMap<Byte, Serializer> SERIALIZERS_MAP = new HashMap<>();
static {
for (RpcSerializer rpcSerializer : RpcSerializer.values()) {
SERIALIZERS_MAP.put(rpcSerializer.type, rpcSerializer.serializer);
}
}
public static Serializer getSerializerByType(byte type) {
return SERIALIZERS_MAP.get(type);
}
}

28
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/serializer/Serializer.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.rpc.serializer;
import java.io.IOException;
public interface Serializer {
<T> byte[] serialize(T obj) throws IOException;
<T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}
Loading…
Cancel
Save