Browse Source

rpc

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
985a4fc99c
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
  2. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
  4. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
  5. 33
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java

@ -56,7 +56,7 @@ public class NettyServerConfig {
/** /**
* listen port * listen port
*/ */
private int listenPort = 12366; private int listenPort = 12336;
public int getListenPort() { public int getListenPort() {
return listenPort; return listenPort;

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@ -24,7 +24,7 @@ public class MainTest {
Host host=new Host("127.0.0.1",12366); Host host=new Host("127.0.0.1",12366);
IRpcClient rpcClient=new RpcClient(); IRpcClient rpcClient=new RpcClient();
IUserService userService= rpcClient.create(IUserService.class); IUserService userService= rpcClient.create(UserService.class);
userService.say("calvin"); userService.say("calvin");

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java

@ -36,7 +36,7 @@ public class ConsumerInterceptor {
//todo //todo
System.out.println(invoker.invoke(request)); System.out.println(invoker.invoke(request));
NettyClient nettyClient = new NettyClient(new NettyClientConfig()); NettyClient nettyClient = new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12366); Host host = new Host("127.0.0.1", 12336);
nettyClient.sendMsg(host, request); nettyClient.sendMsg(host, request);
return null; return null;

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@ -47,7 +47,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息"); System.out.println("收到消息");
RpcResponse rsp = (RpcResponse) msg; RpcResponse rsp = (RpcResponse) msg;
System.out.println(rsp); System.out.println(rsp.getResult().toString());
} }
@Override @Override

33
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@ -40,7 +40,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, InstantiationException {
logger.info("server read msg"); logger.info("server read msg");
System.out.println("收到消息"); System.out.println("收到消息");
RpcRequest req= (RpcRequest) msg; RpcRequest req= (RpcRequest) msg;
@ -49,17 +49,26 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
response.setMsg("llll"); response.setMsg("llll");
response.setRequestId(req.getRequestId()); response.setRequestId(req.getRequestId());
Class<?> handlerClass = req.getClass();
System.out.println(req.getMethodName()); String classname=req.getClassName();
System.out.println(req.getClassName()); //获得服务端要调用的方法名称
String methodName = req.getMethodName(); String methodName=req.getMethodName();
Class<?>[] parameterTypes = req.getParameterTypes(); //获得服务端要调用方法的参数类型
Object[] parameters = req.getParameters(); Class<?>[] parameterTypes=req.getParameterTypes();
//获得服务端要调用方法的每一个参数的值
// JDK reflect Object[] arguments=req.getParameters();
Method method = handlerClass.getMethod(methodName, parameterTypes);
method.setAccessible(true); //创建类
Object result = method.invoke(req.getClassName(), parameters); Class serviceClass=Class.forName(classname);
//创建对象
Object object = serviceClass.newInstance();
//获得该类的对应的方法
Method method=serviceClass.getMethod(methodName, parameterTypes);
//该对象调用指定方法
Object result=method.invoke(object, arguments);
response.setResult(result); response.setResult(result);
ctx.writeAndFlush(response); ctx.writeAndFlush(response);
} }

Loading…
Cancel
Save