Browse Source

fix the method with bindingParameter invoke error

pull/1/head
yichen 2 years ago
parent
commit
29275d46b3
  1. 34
      agent/src/main/java/com/fanruan/AgentStarter.java
  2. 8
      agent/src/main/java/com/fanruan/agent/jdbc/AgentSavepoint.java
  3. 14
      agent/src/main/java/com/fanruan/agent/jdbc/Parametrical.java
  4. 8
      agent/src/main/java/com/fanruan/agent/jdbc/connection/AgentConnection.java
  5. 13
      agent/src/main/java/com/fanruan/annotation/BindingParamter.java
  6. 13
      agent/src/main/java/com/fanruan/annotation/WithBindingParameter.java
  7. 26
      agent/src/main/java/com/fanruan/handler/DispatcherHelper.java
  8. 4
      agent/src/main/java/com/fanruan/handler/DispatcherImpl.java
  9. 15
      service/src/main/java/com/fanruan/annotation/BindArg.java
  10. 13
      service/src/main/java/com/fanruan/annotation/BindingParamter.java
  11. 13
      service/src/main/java/com/fanruan/annotation/WithBindingParameter.java
  12. 20
      service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java
  13. 78
      service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java
  14. 1
      service/src/main/java/com/fanruan/serializer/KryoSerializer.java
  15. 29
      service/src/main/java/com/fanruan/service/jdbc/connection/ServiceConnection.java
  16. 31
      test/src/test/java/com/fanruan/ConnectionTest.java
  17. 1
      test/src/test/java/com/fanruan/InterceptorIT.java
  18. 78
      test/src/test/java/com/fanruan/SavePointTest.java

34
agent/src/main/java/com/fanruan/AgentStarter.java

@ -12,10 +12,10 @@ import okhttp3.OkHttpClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@ -43,14 +43,14 @@ public class AgentStarter {
}
}
private void createSocket(String[][] DBs) throws IOException {
private void createSocket(String[][] DBs){
logger.debug("加载配置");
IO.Options options = new IO.Options();
try{
InputStream in = this.getClass().getResourceAsStream("/socket.properties");
Properties props = new Properties();
InputStreamReader inputStreamReader = new InputStreamReader(in, "UTF-8");
InputStreamReader inputStreamReader = new InputStreamReader(in, StandardCharsets.UTF_8);
props.load(inputStreamReader);
options.transports = new String[]{WebSocket.NAME};
@ -101,14 +101,14 @@ public class AgentStarter {
}
}
private void configDefaultSocket(Socket socket) throws IOException {
socket.on(Socket.EVENT_CONNECT, objects -> {
logger.info("default-socket connected!");
});
private void configDefaultSocket(Socket socket){
socket.on(Socket.EVENT_CONNECT, (objects) ->
logger.info("default-socket connected!")
);
socket.on(Socket.EVENT_CONNECT_ERROR, objects -> {
logger.info("default-socket error: " + objects[0].toString());
});
socket.on(Socket.EVENT_CONNECT_ERROR, objects ->
logger.info("default-socket error: " + objects[0].toString())
);
socket.on(Socket.EVENT_DISCONNECT, objects -> {
for(Object obj : objects){
@ -117,10 +117,10 @@ public class AgentStarter {
});
}
private void configSocket(Socket socket, String dbName) throws IOException {
socket.on(Socket.EVENT_CONNECT, objects -> {
logger.info(dbName + "-socket connected!");
});
private void configSocket(Socket socket, String dbName){
socket.on(Socket.EVENT_CONNECT, objects ->
logger.info(dbName + "-socket connected!")
);
socket.on(Socket.EVENT_DISCONNECT, objects -> {
for(Object obj : objects){
@ -128,9 +128,9 @@ public class AgentStarter {
}
});
socket.on(Socket.EVENT_CONNECT_ERROR, objects -> {
logger.info(dbName + "-socket error: " + objects[0].toString());
});
socket.on(Socket.EVENT_CONNECT_ERROR, objects ->
logger.info(dbName + "-socket error: " + objects[0].toString())
);
socket.on("RPCRequest", objects -> {
RpcRequest rpcRequest = SERIALIZER.deserialize((byte[]) objects[0], RpcRequest.class);

8
agent/src/main/java/com/fanruan/agent/jdbc/AgentSavepoint.java

@ -10,7 +10,8 @@ import java.sql.Savepoint;
* @date 2022/9/9 16:39
*/
@BindClass
public class AgentSavepoint implements Savepoint {
public class AgentSavepoint implements Savepoint, Parametrical{
Savepoint savepoint;
public AgentSavepoint(Savepoint savepoint){
@ -26,4 +27,9 @@ public class AgentSavepoint implements Savepoint {
public String getSavepointName() throws SQLException {
return savepoint.getSavepointName();
}
@Override
public Object getHandler() {
return this.savepoint;
}
}

14
agent/src/main/java/com/fanruan/agent/jdbc/Parametrical.java

@ -0,0 +1,14 @@
package com.fanruan.agent.jdbc;
/**
* @author Yichen Dai
* @date 2022/9/15 11:30
*/
public interface Parametrical {
/**
* get wrapped handler from Agent SQL class
* @return InnerHandler, like connection filed for AgentConnection.
*/
public Object getHandler();
}

8
agent/src/main/java/com/fanruan/agent/jdbc/connection/AgentConnection.java

@ -5,6 +5,8 @@ import com.fanruan.agent.jdbc.statement.AgentCallableStatement;
import com.fanruan.agent.jdbc.statement.AgentPreparedStatement;
import com.fanruan.agent.jdbc.statement.AgentStatement;
import com.fanruan.annotation.BindClass;
import com.fanruan.annotation.BindingParamter;
import com.fanruan.annotation.WithBindingParameter;
import java.sql.*;
import java.util.Map;
@ -166,12 +168,14 @@ public class AgentConnection implements Connection {
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
@WithBindingParameter
public void rollback(@BindingParamter Savepoint savepoint) throws SQLException {
conn.rollback(savepoint);
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
@WithBindingParameter
public void releaseSavepoint(@BindingParamter Savepoint savepoint) throws SQLException {
conn.releaseSavepoint(savepoint);
}

13
agent/src/main/java/com/fanruan/annotation/BindingParamter.java

@ -0,0 +1,13 @@
package com.fanruan.annotation;
import java.lang.annotation.*;
/**
* @author Yichen Dai
* @date 2022/9/15 10:14
*/
@Inherited
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface BindingParamter {
}

13
agent/src/main/java/com/fanruan/annotation/WithBindingParameter.java

@ -0,0 +1,13 @@
package com.fanruan.annotation;
import java.lang.annotation.*;
/**
* @author Yichen Dai
* @date 2022/9/15 10:14
*/
@Inherited
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface WithBindingParameter {
}

26
agent/src/main/java/com/fanruan/handler/DispatcherHelper.java

@ -1,11 +1,17 @@
package com.fanruan.handler;
import com.fanruan.agent.jdbc.Parametrical;
import com.fanruan.annotation.BindClass;
import com.fanruan.annotation.BindingParamter;
import com.fanruan.annotation.WithBindingParameter;
import com.fanruan.cache.BeanCacheImpl;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Some utils for MyDispatcher
@ -31,10 +37,18 @@ public class DispatcherHelper {
return false;
}
Class<?> clazz = obj.getClass();
if(clazz.isAnnotationPresent(BindClass.class)){
return true;
return clazz.isAnnotationPresent(BindClass.class);
}
public static void replaceBindingParameter(BeanCacheImpl beanCache, Method method, Object[] args) throws SQLException {
Parameter[] pars = method.getParameters();
for(int i=0; i<pars.length; i++){
Parameter par = pars[i];
if(par.isAnnotationPresent(BindingParamter.class)){
Parametrical obj = (Parametrical) beanCache.getCachedInstances((String)args[i], par.getType());
args[i] = obj.getHandler();
}
}
return false;
}
public static boolean isWraps(Class<?> clz){
@ -56,4 +70,8 @@ public class DispatcherHelper {
}
return arr[n-1];
}
public static boolean isWithBindingParameter(Method method){
return method.isAnnotationPresent(WithBindingParameter.class);
}
}

4
agent/src/main/java/com/fanruan/handler/DispatcherImpl.java

@ -81,6 +81,10 @@ public class DispatcherImpl implements Dispatcher{
method = clazz.getDeclaredMethod(methodName, argTypes);
}
if (DispatcherHelper.isWithBindingParameter(method)){
DispatcherHelper.replaceBindingParameter(beanCache, method, args);
}
Object res = null;
try{

15
service/src/main/java/com/fanruan/annotation/BindArg.java

@ -1,15 +0,0 @@
package com.fanruan.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Yichen Dai
* @date 2022/9/14 11:30
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface BindArg {
}

13
service/src/main/java/com/fanruan/annotation/BindingParamter.java

@ -0,0 +1,13 @@
package com.fanruan.annotation;
import java.lang.annotation.*;
/**
* @author Yichen Dai
* @date 2022/9/15 10:14
*/
@Inherited
@Target({ElementType.PARAMETER})
@Retention(RetentionPolicy.RUNTIME)
public @interface BindingParamter {
}

13
service/src/main/java/com/fanruan/annotation/WithBindingParameter.java

@ -0,0 +1,13 @@
package com.fanruan.annotation;
import java.lang.annotation.*;
/**
* @author Yichen Dai
* @date 2022/9/15 10:14
*/
@Inherited
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface WithBindingParameter {
}

20
service/src/main/java/com/fanruan/proxy/interceptor/Interceptor.java

@ -48,9 +48,16 @@ public class Interceptor implements MethodInterceptor {
InterceptorUtils.isLocalMethod(method)){
return methodProxy.invokeSuper(o, objects);
}
// Parameters injection of class MyDriver's construction method will be delayed util the first "connect" method was intercepted
// Because Driver Instance is registered on the DriverManager in the static code block,
// in which, the parameters used to fetch socket in cache is hard to pass in.
if(InterceptorUtils.isWithBindingParameter(method)){
InterceptorUtils.replaceBindingParameter(method, objects);
}
/**
* Parameters injection of class MyDriver's construction method will be delayed util the first "connect" method was intercepted
* Because Driver Instance is registered on the DriverManager in the static code block,
* in which, the parameters used to fetch socket in cache is hard to pass in.
*/
if(info == null){
info = (Properties) objects[1];
}
@ -61,6 +68,7 @@ public class Interceptor implements MethodInterceptor {
}
logger.debug("start invoke " + method.getName());
RpcRequest rpcRequest = InterceptorUtils.generateRequest(clazz, o, method, objects);
FutureTask<Object> futureTask = new FutureTask<>(
@ -88,8 +96,12 @@ public class Interceptor implements MethodInterceptor {
ServerStater.threadPool.submit(futureTask);
Object res = futureTask.get();
Object returnObj = null;
// get return value of original method
Object returnObj = methodProxy.invokeSuper(o, objects);
if(!InterceptorUtils.isWithBindingParameter(method)){
returnObj = methodProxy.invokeSuper(o, objects);
}
// res is not null, it indicates the response carries data.
// if the type of res is primitive type, An error will occur when using cast(), just return them directly.

78
service/src/main/java/com/fanruan/proxy/interceptor/InterceptorUtils.java

@ -1,8 +1,6 @@
package com.fanruan.proxy.interceptor;
import com.fanruan.annotation.LocalMethod;
import com.fanruan.annotation.NotImplemented;
import com.fanruan.annotation.RemoteClass;
import com.fanruan.annotation.*;
import com.fanruan.pojo.message.RpcRequest;
import com.fanruan.service.jdbc.AbstractBind;
import com.fanruan.service.jdbc.ServiceInputStream;
@ -10,34 +8,38 @@ import com.fanruan.service.jdbc.ServiceReader;
import com.fanruan.utils.Commons;
import java.io.Reader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.regex.Pattern;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.Map;
/**
* @author Yichen Dai
*/
public class InterceptorUtils {
private static final String[] WRAPPER_CLASS_LIST = new String[]{
"Boolean",
"Integer",
"Double",
"Long",
"Character",
"Byte",
"Short",
"Float"
private static final Map<String, Class<?>> WRAPPER_CLASS_MAP = new HashMap<String, Class<?>>(){
{
put("Integer", Integer.TYPE);
put("Short", Short.TYPE);
put("Long", Long.TYPE);
put("Double", Double.TYPE);
put("Float", Float.TYPE);
put("Byte", Byte.TYPE);
put("Character", Character.TYPE);
put("Boolean", Boolean.TYPE);
}
};
public static boolean isWraps(Object o){
for(String ex : WRAPPER_CLASS_LIST){
if(ex.equals(getClassName(o.getClass().getName()))){
return true;
}
}
Class<?> clz = o.getClass();
if(clz == null) {
return false;
}
return WRAPPER_CLASS_MAP.containsKey(getClassName(clz.getName()));
}
public static String getClassName(String fullyQualifiedClassName){
String[] arr = fullyQualifiedClassName.split("\\.");
@ -53,10 +55,17 @@ public class InterceptorUtils {
return false;
}
Class<?> clazz = o.getClass();
if(clazz.isAnnotationPresent(RemoteClass.class)){
return true;
return clazz.isAnnotationPresent(RemoteClass.class);
}
public static void replaceBindingParameter(Method method, Object[] args){
Parameter[] pars = method.getParameters();
for(int i=0; i<pars.length; i++){
Parameter par = pars[i];
if(par.isAnnotationPresent(BindingParamter.class) && isInBindList(args[i])){
args[i] = getBindID(args[i]);
}
}
return false;
}
public static void setBindID(Object returnObj, String ID){
@ -88,6 +97,10 @@ public class InterceptorUtils {
}
public static RpcRequest generateRequest(Class<?> clazz, Object o, Method method, Object[] objects){
if(isWithBindingParameter(method)){
replaceBindingParameter(method, objects);
}
RpcRequest rpcRequest = new RpcRequest();
rpcRequest
.setID(Commons.getID())
@ -114,17 +127,24 @@ public class InterceptorUtils {
return rpcRequest;
}
public static boolean isNotImplemented(Method method) {
if(method.isAnnotationPresent(NotImplemented.class)){
return true;
public static Class<?>[] getArgTypes(Object[] objects){
int n = objects.length;
Class<?>[] argTypes = new Class<?>[n];
for(int i=0; i<n; i++){
argTypes[i] = objects[i].getClass();
}
return false;
return argTypes;
}
public static boolean isNotImplemented(Method method) {
return method.isAnnotationPresent(NotImplemented.class);
}
public static boolean isLocalMethod(Method method) {
if(method.isAnnotationPresent(LocalMethod.class)){
return true;
return method.isAnnotationPresent(LocalMethod.class);
}
return false;
public static boolean isWithBindingParameter(Method method){
return method.isAnnotationPresent(WithBindingParameter.class);
}
}

1
service/src/main/java/com/fanruan/serializer/KryoSerializer.java

@ -3,6 +3,7 @@ package com.fanruan.serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.TaggedFieldSerializer;
import com.fanruan.pojo.message.RpcRequest;
import com.fanruan.pojo.message.RpcResponse;

29
service/src/main/java/com/fanruan/service/jdbc/connection/ServiceConnection.java

@ -1,10 +1,7 @@
package com.fanruan.service.jdbc.connection;
import com.fanruan.annotation.BindArg;
import com.fanruan.annotation.LocalMethod;
import com.fanruan.annotation.NotImplemented;
import com.fanruan.annotation.RemoteClass;
import com.fanruan.annotation.*;
import com.fanruan.service.jdbc.*;
import com.fanruan.service.jdbc.statement.ServiceCallableStatement;
import com.fanruan.service.jdbc.statement.ServicePreparedStatement;
@ -68,10 +65,7 @@ public class ServiceConnection extends BasedBind implements Connection {
}
@Override
@LocalMethod
@NotImplemented
public void rollback() throws SQLException {
throw new SQLException("Not Implemented!");
}
@Override
@ -176,31 +170,24 @@ public class ServiceConnection extends BasedBind implements Connection {
}
@Override
@LocalMethod
@NotImplemented
public Savepoint setSavepoint() throws SQLException {
throw new SQLException("Not Implemented!");
return (Savepoint) ProxyFactory.getProxy(ServiceSavepoint.class, info);
}
@Override
@LocalMethod
@NotImplemented
public Savepoint setSavepoint(String name) throws SQLException {
throw new SQLException("Not Implemented!");
return (Savepoint) ProxyFactory.getProxy(ServiceSavepoint.class, info);
}
@Override
@LocalMethod
@NotImplemented
public void rollback(Savepoint savepoint) throws SQLException {
throw new SQLException("Not Implemented!");
@WithBindingParameter
public void rollback(@BindingParamter Savepoint savepoint) throws SQLException {
}
@Override
@LocalMethod
@NotImplemented
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
throw new SQLException("Not Implemented!");
@WithBindingParameter
public void releaseSavepoint(@BindingParamter Savepoint savepoint) throws SQLException {
}
@Override

31
test/src/test/java/com/fanruan/ConnectionTest.java

@ -253,37 +253,6 @@ public class ConnectionTest extends BaseJDBCTest{
closeSQLObjects(cst1, cst2);
}
@Test
void testSavePoint() throws SQLException {
Assertions.assertThrows(SQLException.class, () -> {
serviceConnection.setSavepoint();
RpcRequest request = map.get(null);
request.setIDToInvoke(IDtoInvoke);
dispatcher.invokeAsRequest(request, beanCache);
});
Assertions.assertThrows(SQLException.class, () -> {
serviceConnection.setSavepoint("savePoint");
RpcRequest request = map.get(null);
request.setIDToInvoke(IDtoInvoke);
dispatcher.invokeAsRequest(request, beanCache);
});
Assertions.assertThrows(SQLException.class, () -> {
serviceConnection.rollback(new FakeSavepoint());
RpcRequest request = map.get(null);
request.setIDToInvoke(IDtoInvoke);
dispatcher.invokeAsRequest(request, beanCache);
});
Assertions.assertThrows(SQLException.class, () -> {
serviceConnection.releaseSavepoint(new FakeSavepoint());
RpcRequest request = map.get(null);
request.setIDToInvoke(IDtoInvoke);
dispatcher.invokeAsRequest(request, beanCache);
});
}
@Test
void testCreateArrayOf() throws SQLException {
Array array = connection.createArrayOf("DOUBLE", new Double[]{0.1, 0.2});

1
test/src/test/java/com/fanruan/InterceptorIT.java

@ -30,6 +30,7 @@ public class InterceptorIT implements MethodInterceptor {
return methodProxy.invokeSuper(o, objects);
}
Class<?>[] actualArgTypes = InterceptorUtils.getArgTypes(objects);
RpcRequest request = InterceptorUtils.generateRequest(clazz, o, method, objects);
map.put(null, request);

78
test/src/test/java/com/fanruan/SavePointTest.java

@ -2,6 +2,7 @@ package com.fanruan;
import org.junit.jupiter.api.*;
import java.io.ByteArrayInputStream;
import java.sql.*;
/**
@ -30,7 +31,82 @@ public class SavePointTest extends BaseJDBCTest{
// }
@Test
void testSavePoint() throws SQLException{
void testSavePoint1() throws SQLException{
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.execute("DROP TABLE if exists testSP;");
statement.execute("CREATE TABLE testSP (id int);");
connection.commit();
statement.executeUpdate("INSERT INTO testSP VALUES (1),(2);");
ResultSet rs1 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(2, getSizeOfResultSet(rs1));
Savepoint savepoint = connection.setSavepoint("test");
statement.executeUpdate("DELETE FROM testSP WHERE id = 1;");
ResultSet rs2 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(1, getSizeOfResultSet(rs2));
connection.rollback(savepoint);
ResultSet rs3 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(2, getSizeOfResultSet(rs3));
connection.commit();
}
@Test
void testSavePoin2t() throws SQLException{
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.execute("DROP TABLE if exists testSP;");
statement.execute("CREATE TABLE testSP (id int);");
connection.commit();
statement.executeUpdate("INSERT INTO testSP VALUES (1),(2);");
ResultSet rs1 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(2, getSizeOfResultSet(rs1));
statement.executeUpdate("DELETE FROM testSP WHERE id = 1;");
ResultSet rs2 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(1, getSizeOfResultSet(rs2));
connection.rollback();
ResultSet rs3 = statement.executeQuery("SELECT * FROM testSP;");
Assertions.assertEquals(0, getSizeOfResultSet(rs3));
Savepoint savepoint = connection.setSavepoint();
connection.releaseSavepoint(savepoint);
connection.commit();
closeSQLObjects(statement);
}
@Test
void testUpdatableStream() throws SQLException{
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE IF EXISTS updateStreamTest");
statement.executeUpdate("CREATE TABLE updateStreamTest (keyField INT NOT NULL AUTO_INCREMENT PRIMARY KEY, field1 BLOB)");
int streamLength = 1024;
byte[] streamData = new byte[streamLength];
/* create an updatable statement */
Statement updStmt = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
/* fill the resultset with some values */
ResultSet updRs = updStmt.executeQuery("SELECT * FROM updateStreamTest");
/* move to insertRow */
updRs.moveToInsertRow();
/* update the table */
updRs.updateBinaryStream("field1", new ByteArrayInputStream(streamData), streamLength);
updRs.insertRow();
}
}

Loading…
Cancel
Save