diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index e40b7e1274..27f0923017 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -51,6 +51,19 @@ net.bytebuddy byte-buddy + + + + io.protostuff + protostuff-core + 1.7.2 + + + + io.protostuff + protostuff-runtime + 1.7.2 + junit junit diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index c1aea90393..0092b682e8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.remote; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index 867cf4dc56..56b31cd0b5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.remote; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java new file mode 100644 index 0000000000..f212b77389 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java @@ -0,0 +1,44 @@ +package org.apache.dolphinscheduler.remote.decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; + +import java.util.List; + +/** + * @author jiangli + * @date 2021-01-12 18:53 + */ +public class NettyDecoder extends ByteToMessageDecoder { + + private Class genericClass; + + // 构造函数传入向反序列化的class + public NettyDecoder(Class genericClass) { + this.genericClass = genericClass; + } + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { + if (byteBuf.readableBytes() < 4) { + return; + } + byteBuf.markReaderIndex(); + int dataLength = byteBuf.readInt(); + if (dataLength < 0) { + channelHandlerContext.close(); + } + if (byteBuf.readableBytes() < dataLength) { + byteBuf.resetReaderIndex(); + } + //将ByteBuf转换为byte[] + byte[] data = new byte[dataLength]; + byteBuf.readBytes(data); + //将data转换成object + Object obj = ProtoStuffUtils.deserialize(data, genericClass); + list.add(obj); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java new file mode 100644 index 0000000000..3ba5781fb4 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java @@ -0,0 +1,32 @@ +package org.apache.dolphinscheduler.remote.decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; + +/** + * @author jiangli + * @date 2021-01-12 18:52 + */ +public class NettyEncoder extends MessageToByteEncoder { + + + private Class genericClass; + + // 构造函数传入向反序列化的class + public NettyEncoder(Class genericClass) { + this.genericClass = genericClass; + } + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { + + if (genericClass.isInstance(o)) { + byte[] data = ProtoStuffUtils.serialize(o); + byteBuf.writeInt(data.length); + byteBuf.writeBytes(data); + } + + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index a988acfe17..701c3b9a09 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -18,8 +18,6 @@ package org.apache.dolphinscheduler.remote.handler; import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index 09e41e9b54..0e34f99996 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -18,8 +18,6 @@ package org.apache.dolphinscheduler.remote.handler; import org.apache.dolphinscheduler.remote.NettyRemotingServer; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Pair; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java new file mode 100644 index 0000000000..4face03f90 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java @@ -0,0 +1,52 @@ +package org.apache.dolphinscheduler.remote.serialize; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author jiangli + * @date 2021-01-12 18:56 + */ +public class ProtoStuffUtils { + + private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + + private static Map, Schema> schemaCache = new ConcurrentHashMap<>(); + + @SuppressWarnings("unchecked") + public static byte[] serialize(T obj) { + Class clazz = (Class) obj.getClass(); + Schema schema = getSchema(clazz); + byte[] data; + try { + data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); + } finally { + buffer.clear(); + } + return data; + } + + @SuppressWarnings("unchecked") + private static Schema getSchema(Class clazz) { + Schema schema = (Schema) schemaCache.get(clazz); + if (schema == null) { + schema = RuntimeSchema.getSchema(clazz); + if (schema == null) { + schemaCache.put(clazz, schema); + } + } + return schema; + } + + public static T deserialize(byte[] bytes, Class clazz) { + Schema schema = getSchema(clazz); + T obj = schema.newMessage(); + ProtostuffIOUtil.mergeFrom(bytes, obj, schema); + return obj; + } +}