CalvinKirs
4 years ago
8 changed files with 141 additions and 7 deletions
@ -0,0 +1,44 @@
|
||||
package org.apache.dolphinscheduler.remote.decoder; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.handler.codec.ByteToMessageDecoder; |
||||
|
||||
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-12 18:53 |
||||
*/ |
||||
public class NettyDecoder extends ByteToMessageDecoder { |
||||
|
||||
private Class<?> genericClass; |
||||
|
||||
// 构造函数传入向反序列化的class
|
||||
public NettyDecoder(Class<?> genericClass) { |
||||
this.genericClass = genericClass; |
||||
} |
||||
|
||||
@Override |
||||
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { |
||||
if (byteBuf.readableBytes() < 4) { |
||||
return; |
||||
} |
||||
byteBuf.markReaderIndex(); |
||||
int dataLength = byteBuf.readInt(); |
||||
if (dataLength < 0) { |
||||
channelHandlerContext.close(); |
||||
} |
||||
if (byteBuf.readableBytes() < dataLength) { |
||||
byteBuf.resetReaderIndex(); |
||||
} |
||||
//将ByteBuf转换为byte[]
|
||||
byte[] data = new byte[dataLength]; |
||||
byteBuf.readBytes(data); |
||||
//将data转换成object
|
||||
Object obj = ProtoStuffUtils.deserialize(data, genericClass); |
||||
list.add(obj); |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
package org.apache.dolphinscheduler.remote.decoder; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.handler.codec.MessageToByteEncoder; |
||||
|
||||
import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-12 18:52 |
||||
*/ |
||||
public class NettyEncoder extends MessageToByteEncoder { |
||||
|
||||
|
||||
private Class<?> genericClass; |
||||
|
||||
// 构造函数传入向反序列化的class
|
||||
public NettyEncoder(Class<?> genericClass) { |
||||
this.genericClass = genericClass; |
||||
} |
||||
@Override |
||||
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { |
||||
|
||||
if (genericClass.isInstance(o)) { |
||||
byte[] data = ProtoStuffUtils.serialize(o); |
||||
byteBuf.writeInt(data.length); |
||||
byteBuf.writeBytes(data); |
||||
} |
||||
|
||||
} |
||||
} |
@ -0,0 +1,52 @@
|
||||
package org.apache.dolphinscheduler.remote.serialize; |
||||
|
||||
import io.protostuff.LinkedBuffer; |
||||
import io.protostuff.ProtostuffIOUtil; |
||||
import io.protostuff.Schema; |
||||
import io.protostuff.runtime.RuntimeSchema; |
||||
|
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
/** |
||||
* @author jiangli |
||||
* @date 2021-01-12 18:56 |
||||
*/ |
||||
public class ProtoStuffUtils { |
||||
|
||||
private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); |
||||
|
||||
private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>(); |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
public static <T> byte[] serialize(T obj) { |
||||
Class<T> clazz = (Class<T>) obj.getClass(); |
||||
Schema<T> schema = getSchema(clazz); |
||||
byte[] data; |
||||
try { |
||||
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); |
||||
} finally { |
||||
buffer.clear(); |
||||
} |
||||
return data; |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
private static <T> Schema<T> getSchema(Class<T> clazz) { |
||||
Schema<T> schema = (Schema<T>) schemaCache.get(clazz); |
||||
if (schema == null) { |
||||
schema = RuntimeSchema.getSchema(clazz); |
||||
if (schema == null) { |
||||
schemaCache.put(clazz, schema); |
||||
} |
||||
} |
||||
return schema; |
||||
} |
||||
|
||||
public static <T> T deserialize(byte[] bytes, Class<T> clazz) { |
||||
Schema<T> schema = getSchema(clazz); |
||||
T obj = schema.newMessage(); |
||||
ProtostuffIOUtil.mergeFrom(bytes, obj, schema); |
||||
return obj; |
||||
} |
||||
} |
Loading…
Reference in new issue