From e7a77478524ca9706143631622b7f3f26052f52b Mon Sep 17 00:00:00 2001 From: yichen Date: Sun, 18 Sep 2022 11:34:57 +0800 Subject: [PATCH] add configurations of kryo buffer size and use kryo to exclude instance without no-arg constructor --- .../main/java/com/fanruan/ServerStater.java | 3 ++ .../com/fanruan/annotation/LocalMethod.java | 1 + .../proxy/interceptor/Interceptor.java | 2 + .../proxy/interceptor/InterceptorUtils.java | 34 ++++++++++++-- .../fanruan/serializer/KryoSerializer.java | 26 ++++++---- .../src/main/resources/socketIO.properties | 3 ++ .../java/com/fanruan/AbstractDriverTest.java | 3 +- .../test/java/com/fanruan/ResultSetTest.java | 47 +++++++++++++++++++ .../test/java/com/fanruan/SavePointTest.java | 26 ---------- 9 files changed, 103 insertions(+), 42 deletions(-) diff --git a/service/src/main/java/com/fanruan/ServerStater.java b/service/src/main/java/com/fanruan/ServerStater.java index 4f4e099..2742b54 100644 --- a/service/src/main/java/com/fanruan/ServerStater.java +++ b/service/src/main/java/com/fanruan/ServerStater.java @@ -151,6 +151,9 @@ public class ServerStater{ config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); config.setTransports(Transport.WEBSOCKET); + + int bufferSize = Integer.parseInt(props.getProperty("bufferSize")); + KryoSerializer.setBufferSize(bufferSize); in.close(); threadPool = new ThreadPoolExecutor( diff --git a/service/src/main/java/com/fanruan/annotation/LocalMethod.java b/service/src/main/java/com/fanruan/annotation/LocalMethod.java index ba4c4ba..382190b 100644 --- a/service/src/main/java/com/fanruan/annotation/LocalMethod.java +++ b/service/src/main/java/com/fanruan/annotation/LocalMethod.java @@ -3,6 +3,7 @@ package com.fanruan.annotation; import java.lang.annotation.*; /** + * Mark local methods not to be called remotely * @author Yichen Dai * @date 2022/8/31 18:34 */ diff --git a/service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java b/service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java index d6785b5..7beef07 100644 --- a/service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java +++ b/service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java @@ -54,6 +54,8 @@ public class Interceptor implements MethodInterceptor { } + + /** * Parameters injection of class MyDriver's construction method will be delayed util the first "connect" method was intercepted * Because Driver Instance is registered on the DriverManager in the static code block, diff --git a/service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java b/service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java index 0c6707d..74a0869 100644 --- a/service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java +++ b/service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java @@ -1,5 +1,6 @@ package com.fanruan.proxy.interceptor; +import com.esotericsoftware.kryo.Kryo; import com.fanruan.annotation.*; import com.fanruan.pojo.message.RpcRequest; import com.fanruan.service.jdbc.AbstractBind; @@ -10,6 +11,7 @@ import com.fanruan.utils.Commons; import java.io.Reader; import java.lang.reflect.Method; import java.lang.reflect.Parameter; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; @@ -31,6 +33,7 @@ public class InterceptorUtils { } }; + public static final Kryo kryo = new Kryo(); public static boolean isWraps(Object o){ Class clz = o.getClass(); @@ -96,7 +99,9 @@ public class InterceptorUtils { return idToInvoke; } - public static RpcRequest generateRequest(Class clazz, Object o, Method method, Object[] objects){ + public static RpcRequest generateRequest(Class clazz, Object o, Method method, Object[] objects) throws SQLException { + checkObject(method, objects); + RpcRequest rpcRequest = new RpcRequest(); rpcRequest .setID(Commons.getID()) @@ -123,6 +128,29 @@ public class InterceptorUtils { return rpcRequest; } + public static void checkObject(Method method, Object[] objects) throws SQLException { + for (int i=0; i kryoThreadLocal = ThreadLocal.withInitial(() -> { final Kryo kryo = new Kryo(); kryo.register(RpcRequest.class); @@ -28,11 +34,11 @@ public class KryoSerializer implements Serializer { @Override public byte[] serialize(Object object) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - Output output = new Output(byteArrayOutputStream)) { - final Kryo kryo = kryoThreadLocal.get(); - kryo.writeObject(output, object); - kryoThreadLocal.remove(); - return output.toBytes(); + Output output = new Output(byteArrayOutputStream, bufferSize)) { + final Kryo kryo = kryoThreadLocal.get(); + kryo.writeObject(output, object); + kryoThreadLocal.remove(); + return output.toBytes(); } catch (IOException e) { e.printStackTrace(); } @@ -42,11 +48,11 @@ public class KryoSerializer implements Serializer { @Override public T deserialize(byte[] bytes, Class clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - Input input = new Input(byteArrayInputStream)) { - final Kryo kryo = kryoThreadLocal.get(); - final Object o = kryo.readObject(input, clazz); - kryoThreadLocal.remove(); - return clazz.cast(o); + Input input = new Input(byteArrayInputStream, bufferSize)) { + final Kryo kryo = kryoThreadLocal.get(); + final Object o = kryo.readObject(input, clazz); + kryoThreadLocal.remove(); + return clazz.cast(o); } catch (IOException e) { e.printStackTrace(); } diff --git a/service/src/main/resources/socketIO.properties b/service/src/main/resources/socketIO.properties index 488ceaa..9cc9ca8 100644 --- a/service/src/main/resources/socketIO.properties +++ b/service/src/main/resources/socketIO.properties @@ -21,3 +21,6 @@ pingTimeout = 6000000 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval = 25000 +#kryo buffer size +bufferSize = 65536 + diff --git a/test/src/test/java/com/fanruan/AbstractDriverTest.java b/test/src/test/java/com/fanruan/AbstractDriverTest.java index 80fab3a..12d3d07 100644 --- a/test/src/test/java/com/fanruan/AbstractDriverTest.java +++ b/test/src/test/java/com/fanruan/AbstractDriverTest.java @@ -40,9 +40,8 @@ public class AbstractDriverTest { } } - void shutDown(){ + void shutDownAgent(){ agent.shutDown(); - server.shutDown(); } /** diff --git a/test/src/test/java/com/fanruan/ResultSetTest.java b/test/src/test/java/com/fanruan/ResultSetTest.java index fde7d0b..46cf12d 100644 --- a/test/src/test/java/com/fanruan/ResultSetTest.java +++ b/test/src/test/java/com/fanruan/ResultSetTest.java @@ -570,6 +570,53 @@ public class ResultSetTest extends BaseJDBCTest{ closeSQLObjects(statement, prepareStatement); } + @Test + @Disabled("test arg without no-arg constructor Throw SQLException") + void testUpdatableStream() throws SQLException{ + Statement statement = connection.createStatement(); + + statement.executeUpdate("DROP TABLE IF EXISTS updateStreamTest"); + statement.executeUpdate("CREATE TABLE updateStreamTest (keyField INT NOT NULL AUTO_INCREMENT PRIMARY KEY, field1 BLOB)"); + + int streamLength = 1024; + byte[] streamData = new byte[streamLength]; + + /* create an updatable statement */ + Statement updStmt = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); + + /* fill the resultset with some values */ + ResultSet updRs = updStmt.executeQuery("SELECT * FROM updateStreamTest"); + + /* move to insertRow */ + updRs.moveToInsertRow(); + + /* update the table */ + updRs.updateBinaryStream("field1", new ByteArrayInputStream(streamData), streamLength); + + updRs.insertRow(); + } + + @Test + public void testLargeBlob() throws SQLException, IOException { + Statement statement = connection.createStatement(); + + statement.execute("create table TestLargeBlob" + + " (testBlob BLOB(10240));" + ); + + PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO TestLargeBlob VALUES(?);"); + byte[] chuck = new byte[10240]; + + Blob blob1 = new JDBCBlob(chuck); + prepareStatement.setBlob(1, blob1); + prepareStatement.executeUpdate(); + + ResultSet resultSet = statement.executeQuery("SELECT * FROM TestLargeBlob"); + resultSet.next(); + + closeSQLObjects(statement, prepareStatement); + } + @AfterAll void tearDown() throws SQLException{ closeSQLObjects(connection, resultSet); diff --git a/test/src/test/java/com/fanruan/SavePointTest.java b/test/src/test/java/com/fanruan/SavePointTest.java index 5d1f965..c2e71b6 100644 --- a/test/src/test/java/com/fanruan/SavePointTest.java +++ b/test/src/test/java/com/fanruan/SavePointTest.java @@ -84,30 +84,4 @@ public class SavePointTest extends BaseJDBCTest{ closeSQLObjects(statement); } - - @Test - @Disabled - void testUpdatableStream() throws SQLException{ - Statement statement = connection.createStatement(); - - statement.executeUpdate("DROP TABLE IF EXISTS updateStreamTest"); - statement.executeUpdate("CREATE TABLE updateStreamTest (keyField INT NOT NULL AUTO_INCREMENT PRIMARY KEY, field1 BLOB)"); - - int streamLength = 1024; - byte[] streamData = new byte[streamLength]; - - /* create an updatable statement */ - Statement updStmt = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); - - /* fill the resultset with some values */ - ResultSet updRs = updStmt.executeQuery("SELECT * FROM updateStreamTest"); - - /* move to insertRow */ - updRs.moveToInsertRow(); - - /* update the table */ - updRs.updateBinaryStream("field1", new ByteArrayInputStream(streamData), streamLength); - - updRs.insertRow(); - } }