yichen
975c8ff0f3
|
2 years ago | |
---|---|---|
agent | 2 years ago | |
pic | 2 years ago | |
service | 2 years ago | |
test | 2 years ago | |
.gitignore | 2 years ago | |
README.md | 2 years ago | |
pom.xml | 2 years ago |
README.md
README
项目功能
为达到Service
从公网访问客户端所在内网中数据源的效果,通过运行在客户机上的代理程序代理Service
的所有JDBC
请求,并将查询结果返回给Service
。实现目标,Service
除更改使用的JDBC
驱动外,对代理存在无感知,支持主流的包含JDBC
支持的数据库。
项目依赖
Netty-socketio
与Socket.io-client-Java
的对应关系是:
netty-socketio |
Java client |
---|---|
1.7.19 | 1.0.x |
暂无 | Document |
以下用Service
指代Socket
连接中的socket
服务器,它也是需求查询用户内网数据源的公网服务器。
用Agent
指代Socket
连接中的客户端,也是运行在用户PC
上承担远程调用JDBC
方法的代理服务。
具体结构见下文项目结构图。
Quick Start
- 运行
Service
模块下ServiceTest
- 运行
Agent
下 Test
实现方案
-
Service
启动socket
服务与Agent
建立连接后,可以开始使用代理进行查询。 -
Service
端通过自实现的JDBC
驱动,进行JDBC
操作。驱动中使用基于CGlib
的动态代理,对Service
端的所有JDBC
相关驱动类进行增强,所有方法信息会被序列化传递到Agent
执行,并有选择地将结果回送到Service
结构与流程
如上图,对于Service
端来讲,Agent
对其的代理是无感知的。在Service
来看,只是调用了一个自定义的JDBC
驱动进行查询。
这得益于驱动内部方法地重写,自定义地实现类在Agent
和Service
中有相同的名字,但内部实现却不相同,这使得整个RPC的流程十分灵活。
方案细节
动态代理
动态代理是该项目中的核心,如在 Driver
类的 connect
方法中:返回的Connection
就被替换为了动态代理增强过的MyConnection
,实现对Service
中调用的JDBC
方法的完全代理。代理类会依靠info
从缓存中找到命名空间(本项目中以/dataSoure Name
来区别命名空间)对应的socket
,将方法调用信息以RPCReqquest
的方式序列化后发送出去。
// In Service Source Code
@Override
public Connection connect(String url, Properties info) throws SQLException {
String agentID = info.getProperty("agentID");
String dbName = info.getProperty("agentDBName");
if(dbName == null){
dbName = url.split(":")[1];
info.setProperty("agentDBName", dbName);
}
MyConnection myConn = (MyConnection) ProxyFactory.getProxy(MyConnection.class, info);
myConn.setInfo(info);
return myConn;
}
RPC实体类包含如下信息:
@Data
@Accessors(chain = true)
public class RpcRequest {
// Marks whether the method delivered need loopback data
private boolean reply;
// Marks whether the method will create an instance requeired to be cached.
private boolean binding;
private String ID;
private String IDtoInvoke;
private Class ServiceClass;
private String MethodName;
private Object[] args;
private Class[] argTypes;
}
在Agent
收到Request
的时候,会按照报文要求对方法进行调用,某些创建的实例会被缓存,以便之后调用。在本项目中,这些实例的类是:
Drive( MyDriver ), Connection( MyConnection ), Statement( MyStatement ), PreparedStatement( MyPreparedStatement ), ResultSet( MyResult )
public Object invokeAsRequest(RpcRequest rpcRequest, BeanCache beanCache) {
...
// The ID of the rpcRequest could be save as the ID of an instance
// Because one instance can only been create just once for an unique rpcRequest
String IDtoCache = rpcRequest.getID();
String IDtoInvoke = rpcRequest.getIDtoInvoke();
...
注解解耦
在Service
端,定义了以下注解。
annotation
- BindingParameter
- LocalMethod
- RemoteClass
- WithBindingParameter
如上文所说,注册了自定义驱动后。生成的增强类所调用的方法都会生成 RPC 请求,为了能够松散的实现对不同方法的请求控制,需要一些自定义的注解。
LocalMethod
:方法注释,标记不需要远程调用的方法。
RemoteClass
:类注释,标记该类远程调用时对应的类。
WithBindingParameter
: 方法注释,标记包含绑定类方法。因为绑定类作为参数时,实际需要传递的实际是Agentxx
类中持有的成员变量。
BindingParameter
: 参数注释,标记参数中的绑定类。
在Agent
端,定义了以下注解注解。
annotation
- BindClass
- BindingParameter
- WithBindingParameter
BindingPatameter
和WithBindingParameter
同上。
BindClass
:类注释,标记绑定类,在Agent
中用于判断该实例是否需要缓存。
RPC调用
在一次RPC调用流程中,FutureTask
异步获取返回结果,以“生产者-消费者”模型实现一次调用的同步管理。
ClientWrapper
持有着各个命名空间上的socket
。在这些socket
上的通信,每次调用,会在wrapper
中注册一个工具类:LockAndCondition
,发出消息后,等待socket
上出现对应的响应报文唤醒FutureTask
线程。通过锁机制,保证逻辑的正确性。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ClientWrapper {
private SocketIOClient client;
private static Map<String, LockAndCondition> lockMap = new ConcurrentHashMap<>();
public SocketIOClient getClient(){
if(client == null) throw new RuntimeException("no such client");
return client;
}
public LockAndCondition getLockAndCondition(String messageID){
LockAndCondition lac = lockMap.get(messageID);
if(lac == null){
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lac = new LockAndCondition(lock, condition);
lockMap.put(messageID, lac);
}
return lac;
}
public void removeLockAndCondition(String messageID){
lockMap.remove(messageID);
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LockAndCondition{
private ReentrantLock lock;
private Condition condition;
private Object result;
private String BindingID;
LockAndCondition(ReentrantLock lock, Condition condition){
this.lock = lock;
this.condition = condition;
}
}
FutureTask<Object> futureTask = new FutureTask<Object>(
new Callable<Object>() {
@Override
public Object call() throws Exception {
Object res = null;
ClientWrapper wrapper = ClientCache.getClientWrapper(agentID, dbName);
LockAndCondition lac = wrapper.getLockAndCondition(rpcRequest.getID());
ReentrantLock lock = lac.getLock();
Condition condition = lac.getCondition();
try{
byte[] bytes = ServerStater.serializer.serialize(rpcRequest);
lock.lock();
client.sendEvent("RPCRequest", bytes);
condition.await();
// get res from RPC response data
res = lac.getResult();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return res;
}
}
);
ServerStater.threadPool.submit(futureTask);
Object res = futureTask.get();
socket
收到响应时解锁对应的线程。
// rpcResponse
nameSpace.addEventListener("RPCResponse", byte[].class, ((client, data, ackRequest) -> {
RpcResponse rpcResponse = serializer.deserialize(data, RpcResponse.class);
logger.debug("RPCResponse: " + (rpcResponse.getStatus() ? "success" : "fail"));
String agentID = Commons.getAgentID(client);
String dbName = Commons.getDBName(client);
ClientWrapper wrapper = ClientCache.getClientWrapper(agentID, dbName);
LockAndCondition lac = wrapper.getLockAndCondition(rpcResponse.getID());
ReentrantLock lock = lac.getLock();
Condition condition = lac.getCondition();
// When a response is received, it notifies that the futuretask thread blocking on the lockandcondition
// If the response contains data, take it out.
try {
lock.lock();
Object resultData = rpcResponse.getResult();
if(!rpcResponse.getStatus()){
logger.error(resultData);
resultData = null;
}
if(resultData != null) lac.setResult(resultData);
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
wrapper.removeLockAndCondition(rpcResponse.getID());
logger.debug("received response message, signaled condition");
}));
Service
是使用netty
实现的高效同步非阻塞IO
,上文的同步机制可以很大程度上利用socket
的并发效果。
绑定实例
确定Agent
上缓存实例与Service
端实例的一一对应关系是很必要,不然程序在反射调用方法时会产生问题。
例如,对于createStatement()
方法必须由上一步生成的Connection
类进行调用。为了达到这一点,这些Service
端实例必须和Agent
端具有相同的ID。
考虑到在进行RPC
调用回调的时候,利用时间和随机数生成了一个唯一ID
。
public static String getID(){
return getTimeInMillis() + getRandom();
}
public static String getTimeInMillis() {
long timeInMillis = Calendar.getInstance().getTimeInMillis();
return timeInMillis+"";
}
public static String getRandom() {
Random random = new Random();
int nextInt = random.nextInt(9000000);
nextInt=nextInt+1000000;
String str=nextInt+"";
return str;
}
而Agent
端的缓存实例是由某次调用产生的,所以只需将该次调用的RPC
报文ID
标记在实例上,并在收到RPC
响应时为需要绑定的类型打上同样的标记即可。这样Agent
方面,由于存储的实例都有了唯一的ID
作为键,大大简化了缓存系统的复杂性。
标记实现:
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { ...
Object returnObj = methodProxy.invokeSuper(o, objects);
// If the return instance is corresponding with another instance in agent, set the binding ID.
if (InterceptorUtils.isInBindList(returnObj)){
InterceptorUtils.setInvokeHelper(returnObj, "setID", rpcRequest.getID());
}
测试
测试在Intranet
中test
内。
启动流程
-
构造客户端与服务端实例
可见于
AbstractDriverTest
类static final String[][] dbNameAndDriver = new String[][]{ DBProperties.HSQL }; // 服务端与客户端应注册同样的一个或多个驱动,但服务端中不需要加载,仅仅用于构造命名空间 static final ServerStater server = new ServerStater(dbNameAndDriver); static final AgentStarter agent = new AgentStarter(dbNameAndDriver);
-
此时 代理端/服务端 实例已经启动,服务端应优先注册代理端信息,否则会报错。
info.setProperty("user", "sa"); info.setProperty("password", ""); info.setProperty("agentID", "1001"); info.setProperty("agentDBName", DBProperties.HSQL[0]);
-
模拟服务端等待连接
static void openSocket(){ while(ClientCache.getClient( info.getProperty("agentID"), info.getProperty("agentDBName")) == null){ } }
-
连接就绪即可获取连接,此时运行
TestSuit
即可。
使用Junit5 和 HSQL 进行集成测试
这一次项目的测试记录,项目用 IDEA 编写,Junit 5
进行单元测试,HSQL DB
提供方便的内存数据库。
Use Junit 5
Configuring JUnit Platform
To get started with JUnit Platform, you need to add at least a single TestEngine
implementation to your project. For example, if you want to write tests with Jupiter, add the test artifact junit-jupiter-engine
to the dependencies in POM:
<dependencies>
[...]
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.4.0</version>
<scope>test</scope>
</dependency>
[...]
</dependencies>
This will pull in all required dependencies. Among those dependencies is junit-jupiter-api
which contains the classes and interfaces your test source requires to compile. junit-platform-engine
is also resolved and added.
This is the only step that is required to get started - you can now create tests in your test source directory (src/test/java
).
集成测试的细节
生命周期
In order to allow individual test methods to be executed in isolation and to avoid unexpected side effects due to mutable test instance state, JUnit creates a new instance of each test class before executing each test method (see Test Classes and Methods). This "per-method" test instance lifecycle is the default behavior in JUnit Jupiter and is analogous to all previous versions of JUnit.
If you would prefer that JUnit Jupiter execute all test methods on the same test instance, annotate your test class with
@TestInstance(Lifecycle.PER_CLASS)
. When using this mode, a new test instance will be created once per test class. Thus, if your test methods rely on state stored in instance variables, you may need to reset that state in@BeforeEach
or@AfterEach
methods.
根据官方文档,Junit5的默认测试实例的生命周期为pre-method
在集成测试时,应当选择pre_class
,即一个测试类创建一个实例。
成员变量
在pre-class
的生命周期下,整个测试过程中,测试方法有时需要共享变量。比如进行JDBC查询
的时候,只需要共用一个connection
,甚至是一个statement
.
Method Order
Although true unit tests typically should not rely on the order in which they are executed, there are times when it is necessary to enforce a specific test method execution order — for example, when writing integration tests or functional tests where the sequence of the tests is important, especially in conjunction with
@TestInstance(Lifecycle.PER_CLASS)
除上述两点之外,在集成测试中,必不可少的一项是使用Order控制方法的执行顺序。
例子:
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@TestMethodOrder(OrderAnnotation.class)
class OrderedTestsDemo {
@Test
@Order(1)
void nullValues() {
// perform assertions against null values
}
@Test
@Order(2)
void emptyValues() {
// perform assertions against empty values
}
@Test
@Order(3)
void validValues() {
// perform assertions against valid values
}
}
使用 HSQL DB
运行模式
HSQL DB 是一款纯Java编写的免费数据库,并且支持Memory-Only模型,所以每个单测使用独立的数据库就有了可能,Memory-Only模型数据不会持久化跑完即销毁,从根源上解决了测试难以复现和测试对数据的污染问题。
版本问题
maven 依赖
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.5.2</version>
<scope>test</scope>
</dependency>
根据官网:
Latest version 2.7.0 works with JDK 8 and above. Version for JDK 6 is also available.
但是这不代表在测试的时候可以直接调用 2.7.0 版本的代码,因为该版本使用 jdk 11 编译,在使用 IDEA 测试时,直接引用会报错,所以版本应该选择 2.5x .
代码:
public class TestHSQL{
@BeforeAll
static void startHSQL(){
try {
Class.forName(DBProperties.HSQL_DTIVER_NAME);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
...
}
错误信息:
java.lang.UnsupportedClassVersionError: org/hsqldb/jdbcDriver has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
语法问题
HSQL 与 MySQL 语法并不相同,HSQL语法,英文文档读起来稍微有点费劲。
CREATE TABLE student1 (
student_id int(32) PRIMARY KEY AUTO_INCREMENT NOT NULL,
student_name varchar(100) NOT NULL,
student_address varchar(100) NOT NULL
);
如上的sql语句在程序中要这么写,差别还是很多的,对于大部分在标准 SQL 语句之外的语句都需要先检查是否使用了正确的语法。
st.executeUpdate("CREATE TABLE student (" +
"student_id INTEGER GENERATED BY DEFAULT AS IDENTITY " +
"(START WITH 1, INCREMENT BY 1) NOT NULL," +
"student_name VARCHAR(100) NOT NULL," +
"student_address VARCHAR(100) NOT NULL," +
"PRIMARY KEY (student_id)" +
");");
其实还有个取巧的方法,可以使用参数直接开启mysql
语法支持,但依然有些语法MySQL
特点HSQKDB
并不支持,此时应参考HDBSQL
所实现的SQL92
标准。
"jdbc:hsqldb:mem:test;sql.syntax_mys=true"
使用示例
import com.fanruan.AgentStarter;
import com.fanruan.ServerStater;
import com.fanruan.service.jdbc.driver.ServiceDriver;
import com.fanruan.proxy.ProxyFactory;
import com.fanruan.utils.DBProperties;
import org.junit.jupiter.api.*;
import java.sql.*;
import java.util.Properties;
/**
* @author Yichen Dai
* @date 2022/8/18 15:27
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class TestUtil {
static Connection conn = null;
static Statement st = null;
static PreparedStatement pst = null;
static ResultSet rs = null;
static void configService(){
// 启动socket服务器
String[][] DBs = new String[][]{
DBProperties.HSQL,
};
new ServerStater(DBs);
}
static void configAgent(){
// 启动socket客户端
String[][] DBs = new String[][]{
DBProperties.HSQL,
};
new AgentStarter(DBs);
}
@BeforeAll
static void autoConfig(){
configService();
configAgent();
try {
// 等待socket连接
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
@Order(1)
void testConnect() throws SQLException {
// 建立连接
Properties info = new Properties();
info.setProperty("user", "sa");
info.setProperty("password", "");
info.setProperty("agentID", "1001");
info.setProperty("agentDBName", DBProperties.HSQL[0]);
Driver driver = (ServiceDriver) ProxyFactory.getProxy(ServiceDriver.class, null);
conn = driver.connect("jdbc:hsqldb:mem:test;sql.syntax_mys=true", info);
}
@Test
@Order(2)
void testCreateTable1() throws SQLException {
// 创建 statement
st = conn.createStatement();
// 创建表
int num = st.executeUpdate("DROP TABLE student IF EXISTS;");
Assertions.assertEquals(0, num);
num = st.executeUpdate("CREATE TABLE student (" +
"student_id INTEGER GENERATED BY DEFAULT AS IDENTITY " +
"(START WITH 1, INCREMENT BY 1) NOT NULL," +
"student_name VARCHAR(100) NOT NULL," +
"student_address VARCHAR(100) NOT NULL," +
"PRIMARY KEY (student_id)" +
");");
Assertions.assertEquals(0, num);
}
@Test
@Order(3)
void testCreateTable2() throws SQLException {
// 创建表
int num = st.executeUpdate("DROP TABLE student_score IF EXISTS;");
Assertions.assertEquals(0, num);
num = st.executeUpdate("CREATE TABLE score (" +
"student_id int(10) PRIMARY KEY NOT NULL," +
"score int(10) NOT NULL" +
");"
);
Assertions.assertEquals(0, num);
}
@Test
@Order(4)
void testInsert1() throws SQLException {
// 插入数据
int num = st.executeUpdate("INSERT INTO student VALUES" +
"(1, '张三', '上海')," +
"(2, '李四', '北京')," +
"(3, '王五', '成都');");
Assertions.assertEquals(3, num);
}
@Test
@Order(5)
void testInsert2() throws SQLException {
// 插入数据
int num = st.executeUpdate("INSERT INTO score VALUES" +
"(1, 645)," +
"(2, 627)," +
"(3, 591);");
Assertions.assertEquals(3, num);
}
@Test
@Order(6)
void testUpdate() throws SQLException {
// 预查询语句 删除指定 ID
pst = conn.prepareStatement("UPDATE student" +
" SET student_name = '李华', student_address = '杭州'"+
"WHERE student_id = ?");
Assertions.assertNotNull(pst);
pst.setInt(1, 1);
int num = pst.executeUpdate();
Assertions.assertEquals(1, num);
}
@Test
@Order(7)
void testDelete() throws SQLException {
// 预查询语句 删除指定 ID
pst = conn.prepareStatement("delete from student where student_id = ?");
Assertions.assertNotNull(pst);
pst.setInt(1, 3);
int num = pst.executeUpdate();
Assertions.assertEquals(1, num);
}
@Test
@Order(8)
void testSelect() throws SQLException {
rs = st.executeQuery("select * from student;");
String[] nameStrings = new String[]{"李华", "李四", "王五"};
String[] addressString = new String[]{"杭州", "北京", "成都"};
// 结果集断言
int num = 1;
while(rs.next()) {
Assertions.assertEquals(num, rs.getInt("student_id"));
Assertions.assertEquals(nameStrings[num-1], rs.getString("student_name"));
Assertions.assertEquals(addressString[num-1], rs.getString("student_address"));
num++;
}
}
@Test
@Order(9)
void testSubSelect() throws SQLException {
// 插入数据
rs = st.executeQuery(
"SELECT student_name FROM student " +
"WHERE student_id IN " +
"(SELECT student_id " +
"FROM score " +
"WHERE score > 600);"
);
String[] nameStrings = new String[]{"李华", "李四", "王五"};
// 结果集断言
int num = 1;
while(rs.next()) {
Assertions.assertEquals(nameStrings[num-1], rs.getString("student_name"));
num++;
}
}
@Test
@Order(10)
void testJoin() throws SQLException {
// 插入数据
rs = st.executeQuery(
"SELECT A.student_name " +
"FROM student A JOIN score B " +
"ON A.student_id = B.student_id " +
"WHERE score > 600;"
);
String[] nameStrings = new String[]{"李华", "李四", "王五"};
// 结果集断言
int num = 1;
while (rs.next()) {
Assertions.assertEquals(rs.getString("student_name"), nameStrings[num - 1]);
num++;
}
}
}
开发手册
Service 添加JDBC
类
实现接口
1. 以`Service` + `接口名`命名一个类
2. 实现接口
3. 继承`BasedBind`类
4. 对于返回类属于绑定类的情况
1. 使用代理工厂返回替身类
2. 如果该类也有会返回绑定类的方法,对该类调用`setInfo(info)`
5. 为该类添加`RemoteClass(remoteClassName = Agent上对应类的全限定类名)`注解。
继承抽象类
-
以
Service
+抽象类名
命名一个类 -
继承抽象类
-
粘贴所需方法和字段
private String ID; @LocalMethod public String getID(){ return this.ID; } @LocalMethod public void setID(String ID){ this.ID = ID; } @LocalMethod @Override public int hashCode(){ return super.hashCode(); } @LocalMethod @Override public boolean equals(Object obj) { return super.equals(obj); } @LocalMethod @Override protected Object clone() throws CloneNotSupportedException { return super.clone(); } @LocalMethod @Override public String toString() { return super.toString(); } @LocalMethod @Override protected void finalize() throws Throwable { super.finalize(); }
-
对于返回类属于绑定类的情况(包括抽象方法)
- 使用代理工厂返回替身类
- 如果该类也有会返回绑定类的方法,对该类调用
setInfo(info)
-
为该类添加
RemoteClass(remoteClassName = Agent上对应类的全限定类名)
注解。
-
Agent 添加 JDBC
类
实现接口
1. 以`Agent + 接口名`命名一个类
2. 实现接口
3. 组合一个接口类型的成员变量,创建一个入参为该接口类型的构造方法
4. 重写该接口的所有方法
1. 返回值为普通变量,直接用3中成员变量调用该方法
2. 返回值为绑定对象,返回包装对象:`new Agent+接口名(成员变量.方法(参数))`
3. 链式调用方法return this
5. 为该类添加`BindClass`注解
继承抽象类
1. 以`Agent + 继承类名`命名一个类
2. 继承类
3. 组合一个接口类型的成员变量,创建一个入参为该接口类型的构造方法
4. 重写该接口的所有方法(包括继承的抽象方法)
1. 返回值为普通变量,直接用3中成员变量调用该方法
2. 返回值为绑定对象,返回包装对象:`new Agent+接口名(成员变量.方法(参数))`
3. 链式调用方法return this
5. 为该类添加`BindClass`注解