Browse Source

add configurations of kryo buffer size and use kryo to exclude instance without no-arg constructor

master
yichen 2 years ago
parent
commit
e7a7747852
  1. 3
      service/src/main/java/com/fanruan/ServerStater.java
  2. 1
      service/src/main/java/com/fanruan/annotation/LocalMethod.java
  3. 2
      service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java
  4. 34
      service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java
  5. 26
      service/src/main/java/com/fanruan/serializer/KryoSerializer.java
  6. 3
      service/src/main/resources/socketIO.properties
  7. 3
      test/src/test/java/com/fanruan/AbstractDriverTest.java
  8. 47
      test/src/test/java/com/fanruan/ResultSetTest.java
  9. 26
      test/src/test/java/com/fanruan/SavePointTest.java

3
service/src/main/java/com/fanruan/ServerStater.java

@ -151,6 +151,9 @@ public class ServerStater{
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
config.setTransports(Transport.WEBSOCKET);
int bufferSize = Integer.parseInt(props.getProperty("bufferSize"));
KryoSerializer.setBufferSize(bufferSize);
in.close();
threadPool = new ThreadPoolExecutor(

1
service/src/main/java/com/fanruan/annotation/LocalMethod.java

@ -3,6 +3,7 @@ package com.fanruan.annotation;
import java.lang.annotation.*;
/**
* Mark local methods not to be called remotely
* @author Yichen Dai
* @date 2022/8/31 18:34
*/

2
service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java

@ -54,6 +54,8 @@ public class Interceptor implements MethodInterceptor {
}
/**
* Parameters injection of class MyDriver's construction method will be delayed util the first "connect" method was intercepted
* Because Driver Instance is registered on the DriverManager in the static code block,

34
service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java

@ -1,5 +1,6 @@
package com.fanruan.proxy.interceptor;
import com.esotericsoftware.kryo.Kryo;
import com.fanruan.annotation.*;
import com.fanruan.pojo.message.RpcRequest;
import com.fanruan.service.jdbc.AbstractBind;
@ -10,6 +11,7 @@ import com.fanruan.utils.Commons;
import java.io.Reader;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
@ -31,6 +33,7 @@ public class InterceptorUtils {
}
};
public static final Kryo kryo = new Kryo();
public static boolean isWraps(Object o){
Class<?> clz = o.getClass();
@ -96,7 +99,9 @@ public class InterceptorUtils {
return idToInvoke;
}
public static RpcRequest generateRequest(Class<?> clazz, Object o, Method method, Object[] objects){
public static RpcRequest generateRequest(Class<?> clazz, Object o, Method method, Object[] objects) throws SQLException {
checkObject(method, objects);
RpcRequest rpcRequest = new RpcRequest();
rpcRequest
.setID(Commons.getID())
@ -123,6 +128,29 @@ public class InterceptorUtils {
return rpcRequest;
}
public static void checkObject(Method method, Object[] objects) throws SQLException {
for (int i=0; i<objects.length; i++){
Object obj = objects[i];
if(obj == null){
continue;
}
try{
if(kryo.getDefaultSerializer(obj.getClass()) != null){
continue;
}
}catch (IllegalArgumentException e){
// When kryo hasn't default serializer to handler obj, the IllegalArgumentException will be thrown.
}
try {
obj.getClass().getDeclaredConstructor();
}catch (NoSuchMethodException e){
throw new SQLException("can't invoke " + method.getName() + "dose not have no-arg constructor");
}
}
}
public static boolean isNotImplemented(Method method) {
return method.isAnnotationPresent(NotImplemented.class);
}
@ -131,7 +159,5 @@ public class InterceptorUtils {
return method.isAnnotationPresent(LocalMethod.class);
}
public static boolean isWithBindingParameter(Method method){
return method.isAnnotationPresent(WithBindingParameter.class);
}
public static boolean isWithBindingParameter(Method method){ return method.isAnnotationPresent(WithBindingParameter.class); }
}

26
service/src/main/java/com/fanruan/serializer/KryoSerializer.java

@ -16,6 +16,12 @@ import java.io.IOException;
*/
public class KryoSerializer implements Serializer {
private static int bufferSize = 4096;
public static void setBufferSize(int bufferSize) {
KryoSerializer.bufferSize = bufferSize;
}
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
final Kryo kryo = new Kryo();
kryo.register(RpcRequest.class);
@ -28,11 +34,11 @@ public class KryoSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
final Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, object);
kryoThreadLocal.remove();
return output.toBytes();
Output output = new Output(byteArrayOutputStream, bufferSize)) {
final Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, object);
kryoThreadLocal.remove();
return output.toBytes();
} catch (IOException e) {
e.printStackTrace();
}
@ -42,11 +48,11 @@ public class KryoSerializer implements Serializer {
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
final Kryo kryo = kryoThreadLocal.get();
final Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
Input input = new Input(byteArrayInputStream, bufferSize)) {
final Kryo kryo = kryoThreadLocal.get();
final Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (IOException e) {
e.printStackTrace();
}

3
service/src/main/resources/socketIO.properties

@ -21,3 +21,6 @@ pingTimeout = 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval = 25000
#kryo buffer size
bufferSize = 65536

3
test/src/test/java/com/fanruan/AbstractDriverTest.java

@ -40,9 +40,8 @@ public class AbstractDriverTest {
}
}
void shutDown(){
void shutDownAgent(){
agent.shutDown();
server.shutDown();
}
/**

47
test/src/test/java/com/fanruan/ResultSetTest.java

@ -570,6 +570,53 @@ public class ResultSetTest extends BaseJDBCTest{
closeSQLObjects(statement, prepareStatement);
}
@Test
@Disabled("test arg without no-arg constructor Throw SQLException")
void testUpdatableStream() throws SQLException{
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE IF EXISTS updateStreamTest");
statement.executeUpdate("CREATE TABLE updateStreamTest (keyField INT NOT NULL AUTO_INCREMENT PRIMARY KEY, field1 BLOB)");
int streamLength = 1024;
byte[] streamData = new byte[streamLength];
/* create an updatable statement */
Statement updStmt = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
/* fill the resultset with some values */
ResultSet updRs = updStmt.executeQuery("SELECT * FROM updateStreamTest");
/* move to insertRow */
updRs.moveToInsertRow();
/* update the table */
updRs.updateBinaryStream("field1", new ByteArrayInputStream(streamData), streamLength);
updRs.insertRow();
}
@Test
public void testLargeBlob() throws SQLException, IOException {
Statement statement = connection.createStatement();
statement.execute("create table TestLargeBlob" +
" (testBlob BLOB(10240));"
);
PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO TestLargeBlob VALUES(?);");
byte[] chuck = new byte[10240];
Blob blob1 = new JDBCBlob(chuck);
prepareStatement.setBlob(1, blob1);
prepareStatement.executeUpdate();
ResultSet resultSet = statement.executeQuery("SELECT * FROM TestLargeBlob");
resultSet.next();
closeSQLObjects(statement, prepareStatement);
}
@AfterAll
void tearDown() throws SQLException{
closeSQLObjects(connection, resultSet);

26
test/src/test/java/com/fanruan/SavePointTest.java

@ -84,30 +84,4 @@ public class SavePointTest extends BaseJDBCTest{
closeSQLObjects(statement);
}
@Test
@Disabled
void testUpdatableStream() throws SQLException{
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE IF EXISTS updateStreamTest");
statement.executeUpdate("CREATE TABLE updateStreamTest (keyField INT NOT NULL AUTO_INCREMENT PRIMARY KEY, field1 BLOB)");
int streamLength = 1024;
byte[] streamData = new byte[streamLength];
/* create an updatable statement */
Statement updStmt = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
/* fill the resultset with some values */
ResultSet updRs = updStmt.executeQuery("SELECT * FROM updateStreamTest");
/* move to insertRow */
updRs.moveToInsertRow();
/* update the table */
updRs.updateBinaryStream("field1", new ByteArrayInputStream(streamData), streamLength);
updRs.insertRow();
}
}

Loading…
Cancel
Save