# README ## 项目功能 ​ 为达到`Service` 从公网访问客户端所在内网中数据源的效果,通过运行在客户机上的代理程序代理`Service`的所有`JDBC`请求,并将查询结果返回给`Service`。实现目标,`Service`除更改使用的`JDBC`驱动外,对代理存在无感知,支持主流的包含`JDBC`支持的数据库。 ## 项目依赖 `Netty-socketio`与`Socket.io-client-Java`的对应关系是: | [`netty-socketio`](https://github.com/mrniko/netty-socketio) | [`Java client`](https://github.com/socketio/socket.io-client-java) | | ------------------------------------------------------------ | ------------------------------------------------------------ | | 1.7.19 | 1.0.x | | 暂无 | [Document](https://socketio.github.io/socket.io-client-java/installation.html) | 以下用`Service`指代`Socket`连接中的`socket`服务器,它也是需求查询用户内网数据源的公网服务器。 用`Agent`指代`Socket`连接中的客户端,也是运行在用户`PC`上承担远程调用`JDBC`方法的代理服务。 具体结构见下文项目结构图。 ## Quick Start 1. 运行`Service`模块下 `ServiceTest` 2. 运行`Agent`下 Test ## 实现方案 1. `Service` 启动`socket`服务与 `Agent`建立连接后,可以开始使用代理进行查询。 2. `Service`端通过自实现的`JDBC`驱动,进行`JDBC`操作。驱动中使用基于`CGlib`的动态代理,对`Service`端的所有`JDBC`相关驱动类进行增强,所有方法信息会被序列化传递到`Agent`执行,并有选择地将结果回送到`Service` ## 结构与流程 ![projec structure](https://github.com/yichen97/Intranet/blob/master/pic/project%20structure.jpg) 如上图,对于`Service` 端来讲,`Agent`对其的代理是无感知的。在`Service`来看,只是调用了一个自定义的`JDBC`驱动进行查询。 这得益于驱动内部方法地重写,自定义地实现类在`Agent`和`Service`中有相同的名字,但内部实现却不相同,这使得整个RPC的流程十分灵活。 ## 方案细节 ### 动态代理 动态代理是该项目中的核心,如在 `Driver`类的 `connect`方法中:返回的`Connection`就被替换为了动态代理增强过的`MyConnection`,实现对`Service`中调用的`JDBC`方法的完全代理。代理类会依靠`info`从缓存中找到命名空间(本项目中以`/dataSoure Name`来区别命名空间)对应的`socket`,将方法调用信息以`RPCReqquest`的方式序列化后发送出去。 ```java // In Service Source Code @Override public Connection connect(String url, Properties info) throws SQLException { String agentID = info.getProperty("agentID"); String dbName = info.getProperty("agentDBName"); if(dbName == null){ dbName = url.split(":")[1]; info.setProperty("agentDBName", dbName); } MyConnection myConn = (MyConnection) ProxyFactory.getProxy(MyConnection.class, info); myConn.setInfo(info); return myConn; } ``` RPC实体类包含如下信息: ```java @Data @Accessors(chain = true) public class RpcRequest { // Marks whether the method delivered need loopback data private boolean reply; // Marks whether the method will create an instance requeired to be cached. private boolean binding; private String ID; private String IDtoInvoke; private Class ServiceClass; private String MethodName; private Object[] args; private Class[] argTypes; } ``` 在`Agent`收到`Request`的时候,会按照报文要求对方法进行调用,某些创建的实例会被缓存,以便之后调用。在本项目中,这些实例的类是: ``` Drive( MyDriver ), Connection( MyConnection ), Statement( MyStatement ), PreparedStatement( MyPreparedStatement ), ResultSet( MyResult ) ``` ```java public Object invokeAsRequest(RpcRequest rpcRequest, BeanCache beanCache) { ... // The ID of the rpcRequest could be save as the ID of an instance // Because one instance can only been create just once for an unique rpcRequest String IDtoCache = rpcRequest.getID(); String IDtoInvoke = rpcRequest.getIDtoInvoke(); ... ``` ### 注解解耦 在`Service`端,定义了以下注解。 ``` annotation - BindingParameter - LocalMethod - RemoteClass - WithBindingParameter ``` 如上文所说,注册了自定义驱动后。生成的增强类所调用的方法都会生成 RPC 请求,为了能够松散的实现对不同方法的请求控制,需要一些自定义的注解。 `LocalMethod`:方法注释,标记不需要远程调用的方法。 `RemoteClass`:类注释,标记该类远程调用时对应的类。 `WithBindingParameter`: 方法注释,标记包含绑定类方法。因为绑定类作为参数时,实际需要传递的实际是`Agentxx`类中持有的成员变量。 `BindingParameter`: 参数注释,标记参数中的绑定类。 在`Agent`端,定义了以下注解注解。 ```java annotation - BindClass - BindingParameter - WithBindingParameter ``` `BindingPatameter`和`WithBindingParameter`同上。 `BindClass`:类注释,标记绑定类,在`Agent`中用于判断该实例是否需要缓存。 ### RPC调用 在一次RPC调用流程中,`FutureTask` 异步获取返回结果,以“生产者-消费者”模型实现一次调用的同步管理。 `ClientWrapper` 持有着各个命名空间上的`socket`。在这些`socket`上的通信,每次调用,会在`wrapper`中注册一个工具类:`LockAndCondition`,发出消息后,等待`socket`上出现对应的响应报文唤醒`FutureTask` 线程。通过锁机制,保证逻辑的正确性。 ```java @Data @AllArgsConstructor @NoArgsConstructor public class ClientWrapper { private SocketIOClient client; private static Map lockMap = new ConcurrentHashMap<>(); public SocketIOClient getClient(){ if(client == null) throw new RuntimeException("no such client"); return client; } public LockAndCondition getLockAndCondition(String messageID){ LockAndCondition lac = lockMap.get(messageID); if(lac == null){ ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lac = new LockAndCondition(lock, condition); lockMap.put(messageID, lac); } return lac; } public void removeLockAndCondition(String messageID){ lockMap.remove(messageID); } } ``` ```java @Data @NoArgsConstructor @AllArgsConstructor public class LockAndCondition{ private ReentrantLock lock; private Condition condition; private Object result; private String BindingID; LockAndCondition(ReentrantLock lock, Condition condition){ this.lock = lock; this.condition = condition; } } ``` ```java FutureTask futureTask = new FutureTask( new Callable() { @Override public Object call() throws Exception { Object res = null; ClientWrapper wrapper = ClientCache.getClientWrapper(agentID, dbName); LockAndCondition lac = wrapper.getLockAndCondition(rpcRequest.getID()); ReentrantLock lock = lac.getLock(); Condition condition = lac.getCondition(); try{ byte[] bytes = ServerStater.serializer.serialize(rpcRequest); lock.lock(); client.sendEvent("RPCRequest", bytes); condition.await(); // get res from RPC response data res = lac.getResult(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return res; } } ); ServerStater.threadPool.submit(futureTask); Object res = futureTask.get(); ``` `socket`收到响应时解锁对应的线程。 ```java // rpcResponse nameSpace.addEventListener("RPCResponse", byte[].class, ((client, data, ackRequest) -> { RpcResponse rpcResponse = serializer.deserialize(data, RpcResponse.class); logger.debug("RPCResponse: " + (rpcResponse.getStatus() ? "success" : "fail")); String agentID = Commons.getAgentID(client); String dbName = Commons.getDBName(client); ClientWrapper wrapper = ClientCache.getClientWrapper(agentID, dbName); LockAndCondition lac = wrapper.getLockAndCondition(rpcResponse.getID()); ReentrantLock lock = lac.getLock(); Condition condition = lac.getCondition(); // When a response is received, it notifies that the futuretask thread blocking on the lockandcondition // If the response contains data, take it out. try { lock.lock(); Object resultData = rpcResponse.getResult(); if(!rpcResponse.getStatus()){ logger.error(resultData); resultData = null; } if(resultData != null) lac.setResult(resultData); condition.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } wrapper.removeLockAndCondition(rpcResponse.getID()); logger.debug("received response message, signaled condition"); })); ``` `Service`是使用`netty`实现的高效同步非阻塞`IO`,上文的同步机制可以很大程度上利用`socket`的并发效果。 ### 绑定实例 确定`Agent`上缓存实例与`Service`端实例的一一对应关系是很必要,不然程序在反射调用方法时会产生问题。 例如,对于`createStatement()`方法必须由上一步生成的`Connection`类进行调用。为了达到这一点,这些`Service`端实例必须和`Agent`端具有相同的ID。 考虑到在进行`RPC`调用回调的时候,利用时间和随机数生成了一个唯一`ID`。 ```java public static String getID(){ return getTimeInMillis() + getRandom(); } public static String getTimeInMillis() { long timeInMillis = Calendar.getInstance().getTimeInMillis(); return timeInMillis+""; } public static String getRandom() { Random random = new Random(); int nextInt = random.nextInt(9000000); nextInt=nextInt+1000000; String str=nextInt+""; return str; } ``` 而`Agent`端的缓存实例是由某次调用产生的,所以只需将该次调用的`RPC`报文`ID`标记在实例上,并在收到`RPC`响应时为需要绑定的类型打上同样的标记即可。这样`Agent`方面,由于存储的实例都有了唯一的`ID`作为键,大大简化了缓存系统的复杂性。 标记实现: ```java @Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { ... Object returnObj = methodProxy.invokeSuper(o, objects); // If the return instance is corresponding with another instance in agent, set the binding ID. if (InterceptorUtils.isInBindList(returnObj)){ InterceptorUtils.setInvokeHelper(returnObj, "setID", rpcRequest.getID()); } ``` # 测试 测试在`Intranet`中`test`内。 ## 启动流程 1. 构造客户端与服务端实例 可见于`AbstractDriverTest`类 ```java static final String[][] dbNameAndDriver = new String[][]{ DBProperties.HSQL }; // 服务端与客户端应注册同样的一个或多个驱动,但服务端中不需要加载,仅仅用于构造命名空间 static final ServerStater server = new ServerStater(dbNameAndDriver); static final AgentStarter agent = new AgentStarter(dbNameAndDriver); ``` 2. 此时 代理端/服务端 实例已经启动,服务端应优先注册代理端信息,否则会报错。 ```java info.setProperty("user", "sa"); info.setProperty("password", ""); info.setProperty("agentID", "1001"); info.setProperty("agentDBName", DBProperties.HSQL[0]); ``` 3. 模拟服务端等待连接 ```java static void openSocket(){ while(ClientCache.getClient( info.getProperty("agentID"), info.getProperty("agentDBName")) == null){ } } ``` 4. 连接就绪即可获取连接,此时运行`TestSuit`即可。 ## 使用Junit5 和 HSQL 进行集成测试 这一次项目的测试记录,项目用 IDEA 编写,`Junit 5`进行单元测试,`HSQL DB`提供方便的内存数据库。 ## Use Junit 5 ### Configuring JUnit Platform To get started with JUnit Platform, you need to add at least a single `TestEngine` implementation to your project. For example, if you want to write tests with Jupiter, add the test artifact `junit-jupiter-engine` to the dependencies in POM: ```xml [...] org.junit.jupiter junit-jupiter-engine 5.4.0 test [...] ``` This will pull in all required dependencies. Among those dependencies is `junit-jupiter-api` which contains the classes and interfaces your test source requires to compile. `junit-platform-engine` is also resolved and added. This is the only step that is required to get started - you can now create tests in your test source directory (`src/test/java`). ### 集成测试的细节 #### [生命周期](https://junit.org/junit5/docs/current/user-guide/#writing-tests-test-instance-lifecycle) > In order to allow individual test methods to be executed in isolation and to avoid unexpected side effects due to mutable test instance state, JUnit creates a new instance of each test class before executing each *test method* (see [Test Classes and Methods](https://junit.org/junit5/docs/current/user-guide/#writing-tests-classes-and-methods)). This "per-method" test instance lifecycle is the default behavior in JUnit Jupiter and is analogous to all previous versions of JUnit. > > If you would prefer that JUnit Jupiter execute all test methods on the same test instance, annotate your test class with `@TestInstance(Lifecycle.PER_CLASS)`. When using this mode, a new test instance will be created once per test class. Thus, if your test methods rely on state stored in instance variables, you may need to reset that state in `@BeforeEach` or `@AfterEach` methods. 根据官方文档,Junit5的默认测试实例的生命周期为`pre-method`在集成测试时,应当选择`pre_class`,即一个测试类创建一个实例。 #### 成员变量 在`pre-class`的生命周期下,整个测试过程中,测试方法有时需要共享变量。比如进行`JDBC查询`的时候,只需要共用一个`connection`,甚至是一个`statement`. #### [Method Order](https://junit.org/junit5/docs/current/user-guide/#writing-tests-test-execution-order-methods) > Although true *unit tests* typically should not rely on the order in which they are executed, there are times when it is necessary to enforce a specific test method execution order — for example, when writing *integration tests* or *functional tests* where the sequence of the tests is important, especially in conjunction with `@TestInstance(Lifecycle.PER_CLASS)` 除上述两点之外,在集成测试中,必不可少的一项是使用Order控制方法的执行顺序。 例子: ```java import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; @TestMethodOrder(OrderAnnotation.class) class OrderedTestsDemo { @Test @Order(1) void nullValues() { // perform assertions against null values } @Test @Order(2) void emptyValues() { // perform assertions against empty values } @Test @Order(3) void validValues() { // perform assertions against valid values } } ``` ## 使用 HSQL DB ### 运行模式 HSQL DB 是一款纯Java编写的免费数据库,并且支持Memory-Only模型,所以每个单测使用独立的数据库就有了可能,Memory-Only模型数据不会持久化跑完即销毁,从根源上解决了测试难以复现和测试对数据的污染问题。 [Memory-Only 内存模式](https://blog.csdn.net/smmi/article/details/83779238) ### 版本问题 maven 依赖 ```xml org.hsqldb hsqldb 2.5.2 test ``` 根据官网: > Latest version 2.7.0 works with JDK 8 and above. Version for JDK 6 is also available. 但是这不代表在测试的时候可以直接调用 2.7.0 版本的代码,因为该版本使用 jdk 11 编译,在使用 IDEA 测试时,直接引用会报错,所以版本应该选择 2.5x . 代码: ```java public class TestHSQL{ @BeforeAll static void startHSQL(){ try { Class.forName(DBProperties.HSQL_DTIVER_NAME); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } ... } ``` 错误信息: ``` java.lang.UnsupportedClassVersionError: org/hsqldb/jdbcDriver has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0 ``` ### 语法问题 HSQL 与 MySQL 语法并不相同,[HSQL语法](https://www.hsqldb.org/doc/2.0/guide/databaseobjects-chapt.html#dbc_schema_def_statements),英文文档读起来稍微有点费劲。 ```mysql CREATE TABLE student1 ( student_id int(32) PRIMARY KEY AUTO_INCREMENT NOT NULL, student_name varchar(100) NOT NULL, student_address varchar(100) NOT NULL ); ``` 如上的sql语句在程序中要这么写,差别还是很多的,对于大部分在标准 SQL 语句之外的语句都需要先检查是否使用了正确的语法。 ```java st.executeUpdate("CREATE TABLE student (" + "student_id INTEGER GENERATED BY DEFAULT AS IDENTITY " + "(START WITH 1, INCREMENT BY 1) NOT NULL," + "student_name VARCHAR(100) NOT NULL," + "student_address VARCHAR(100) NOT NULL," + "PRIMARY KEY (student_id)" + ");"); ``` 其实还有个取巧的方法,可以使用参数直接开启`mysql`语法支持,但依然有些语法`MySQL`特点`HSQKDB`并不支持,此时应参考`HDBSQL`所实现的`SQL92`标准。 ![image-20220823100800096](C:\Users\85065\Desktop\fr\内网穿透\pic\sql.mysql.png) ``` "jdbc:hsqldb:mem:test;sql.syntax_mys=true" ``` ## 使用示例 ```java import com.fanruan.AgentStarter; import com.fanruan.ServerStater; import com.fanruan.service.jdbc.driver.ServiceDriver; import com.fanruan.proxy.ProxyFactory; import com.fanruan.utils.DBProperties; import org.junit.jupiter.api.*; import java.sql.*; import java.util.Properties; /** * @author Yichen Dai * @date 2022/8/18 15:27 */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TestUtil { static Connection conn = null; static Statement st = null; static PreparedStatement pst = null; static ResultSet rs = null; static void configService(){ // 启动socket服务器 String[][] DBs = new String[][]{ DBProperties.HSQL, }; new ServerStater(DBs); } static void configAgent(){ // 启动socket客户端 String[][] DBs = new String[][]{ DBProperties.HSQL, }; new AgentStarter(DBs); } @BeforeAll static void autoConfig(){ configService(); configAgent(); try { // 等待socket连接 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } @Test @Order(1) void testConnect() throws SQLException { // 建立连接 Properties info = new Properties(); info.setProperty("user", "sa"); info.setProperty("password", ""); info.setProperty("agentID", "1001"); info.setProperty("agentDBName", DBProperties.HSQL[0]); Driver driver = (ServiceDriver) ProxyFactory.getProxy(ServiceDriver.class, null); conn = driver.connect("jdbc:hsqldb:mem:test;sql.syntax_mys=true", info); } @Test @Order(2) void testCreateTable1() throws SQLException { // 创建 statement st = conn.createStatement(); // 创建表 int num = st.executeUpdate("DROP TABLE student IF EXISTS;"); Assertions.assertEquals(0, num); num = st.executeUpdate("CREATE TABLE student (" + "student_id INTEGER GENERATED BY DEFAULT AS IDENTITY " + "(START WITH 1, INCREMENT BY 1) NOT NULL," + "student_name VARCHAR(100) NOT NULL," + "student_address VARCHAR(100) NOT NULL," + "PRIMARY KEY (student_id)" + ");"); Assertions.assertEquals(0, num); } @Test @Order(3) void testCreateTable2() throws SQLException { // 创建表 int num = st.executeUpdate("DROP TABLE student_score IF EXISTS;"); Assertions.assertEquals(0, num); num = st.executeUpdate("CREATE TABLE score (" + "student_id int(10) PRIMARY KEY NOT NULL," + "score int(10) NOT NULL" + ");" ); Assertions.assertEquals(0, num); } @Test @Order(4) void testInsert1() throws SQLException { // 插入数据 int num = st.executeUpdate("INSERT INTO student VALUES" + "(1, '张三', '上海')," + "(2, '李四', '北京')," + "(3, '王五', '成都');"); Assertions.assertEquals(3, num); } @Test @Order(5) void testInsert2() throws SQLException { // 插入数据 int num = st.executeUpdate("INSERT INTO score VALUES" + "(1, 645)," + "(2, 627)," + "(3, 591);"); Assertions.assertEquals(3, num); } @Test @Order(6) void testUpdate() throws SQLException { // 预查询语句 删除指定 ID pst = conn.prepareStatement("UPDATE student" + " SET student_name = '李华', student_address = '杭州'"+ "WHERE student_id = ?"); Assertions.assertNotNull(pst); pst.setInt(1, 1); int num = pst.executeUpdate(); Assertions.assertEquals(1, num); } @Test @Order(7) void testDelete() throws SQLException { // 预查询语句 删除指定 ID pst = conn.prepareStatement("delete from student where student_id = ?"); Assertions.assertNotNull(pst); pst.setInt(1, 3); int num = pst.executeUpdate(); Assertions.assertEquals(1, num); } @Test @Order(8) void testSelect() throws SQLException { rs = st.executeQuery("select * from student;"); String[] nameStrings = new String[]{"李华", "李四", "王五"}; String[] addressString = new String[]{"杭州", "北京", "成都"}; // 结果集断言 int num = 1; while(rs.next()) { Assertions.assertEquals(num, rs.getInt("student_id")); Assertions.assertEquals(nameStrings[num-1], rs.getString("student_name")); Assertions.assertEquals(addressString[num-1], rs.getString("student_address")); num++; } } @Test @Order(9) void testSubSelect() throws SQLException { // 插入数据 rs = st.executeQuery( "SELECT student_name FROM student " + "WHERE student_id IN " + "(SELECT student_id " + "FROM score " + "WHERE score > 600);" ); String[] nameStrings = new String[]{"李华", "李四", "王五"}; // 结果集断言 int num = 1; while(rs.next()) { Assertions.assertEquals(nameStrings[num-1], rs.getString("student_name")); num++; } } @Test @Order(10) void testJoin() throws SQLException { // 插入数据 rs = st.executeQuery( "SELECT A.student_name " + "FROM student A JOIN score B " + "ON A.student_id = B.student_id " + "WHERE score > 600;" ); String[] nameStrings = new String[]{"李华", "李四", "王五"}; // 结果集断言 int num = 1; while (rs.next()) { Assertions.assertEquals(rs.getString("student_name"), nameStrings[num - 1]); num++; } } } ``` # 开发手册 ## Service 添加`JDBC`类 ### 实现接口 1. 以`Service` + `接口名`命名一个类 2. 实现接口 3. 继承`BasedBind`类 4. 对于返回类属于绑定类的情况 1. 使用代理工厂返回替身类 2. 如果该类也有会返回绑定类的方法,对该类调用`setInfo(info)` 5. 为该类添加`RemoteClass(remoteClassName = Agent上对应类的全限定类名)`注解。 ### 继承抽象类 1. 以`Service` + `抽象类名`命名一个类 2. 继承抽象类 3. 粘贴所需方法和字段 ```java private String ID; @LocalMethod public String getID(){ return this.ID; } @LocalMethod public void setID(String ID){ this.ID = ID; } @LocalMethod @Override public int hashCode(){ return super.hashCode(); } @LocalMethod @Override public boolean equals(Object obj) { return super.equals(obj); } @LocalMethod @Override protected Object clone() throws CloneNotSupportedException { return super.clone(); } @LocalMethod @Override public String toString() { return super.toString(); } @LocalMethod @Override protected void finalize() throws Throwable { super.finalize(); } ``` 4. 对于返回类属于绑定类的情况(包括抽象方法) 1. 使用代理工厂返回替身类 2. 如果该类也有会返回绑定类的方法,对该类调用`setInfo(info)` 5. 为该类添加`RemoteClass(remoteClassName = Agent上对应类的全限定类名)`注解。 ## Agent 添加 `JDBC`类 ### 实现接口 1. 以`Agent + 接口名`命名一个类 2. 实现接口 3. 组合一个接口类型的成员变量,创建一个入参为该接口类型的构造方法 4. 重写该接口的所有方法 1. 返回值为普通变量,直接用3中成员变量调用该方法 2. 返回值为绑定对象,返回包装对象:`new Agent+接口名(成员变量.方法(参数))` 3. 链式调用方法return this 5. 为该类添加`BindClass`注解 ### 继承抽象类 ``` 1. 以`Agent + 继承类名`命名一个类 2. 继承类 3. 组合一个接口类型的成员变量,创建一个入参为该接口类型的构造方法 4. 重写该接口的所有方法(包括继承的抽象方法) 1. 返回值为普通变量,直接用3中成员变量调用该方法 2. 返回值为绑定对象,返回包装对象:`new Agent+接口名(成员变量.方法(参数))` 3. 链式调用方法return this 5. 为该类添加`BindClass`注解 ``` # 项目参考 [nuzzle: A Simple RPC Project](https://github.com/sakiila/nuzzle) [CSV JDBC Driver](https://github.com/peterborkuti/csv-jdbc-driver)