From ed164fe3257d6f6e358a1b581fdc3f36f508fd27 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Thu, 14 Jan 2021 14:57:02 +0800 Subject: [PATCH] rpc --- .../remote/rpc/IUserService.java | 2 +- .../dolphinscheduler/remote/rpc/MainTest.java | 15 +++++++----- .../remote/rpc/UserService.java | 4 ++-- .../rpc/client/ConsumerInterceptor.java | 6 +++++ .../remote/rpc/client/RpcClient.java | 5 ++++ .../remote/rpc/remote/NettyServerHandler.java | 23 ++++++++++++++++++- 6 files changed, 45 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java index e834995028..23ae0c2586 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java @@ -6,5 +6,5 @@ package org.apache.dolphinscheduler.remote.rpc; */ public interface IUserService { - String say(); + String say(String sb); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index 204d9d4150..eab927161b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -3,6 +3,8 @@ package org.apache.dolphinscheduler.remote.rpc; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; +import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; @@ -17,16 +19,17 @@ public class MainTest { public static void main(String[] args) throws Exception { NettyServer nettyServer=new NettyServer(new NettyServerConfig()); - NettyClient nettyClient=new NettyClient(new NettyClientConfig()); + // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); Host host=new Host("127.0.0.1",12366); - RpcRequest rpcRequest=new RpcRequest(); - rpcRequest.setRequestId("988"); - rpcRequest.setClassName("kris"); - rpcRequest.setMethodName("ll"); + IRpcClient rpcClient=new RpcClient(); + IUserService userService= rpcClient.create(IUserService.class); + userService.say("calvin"); - nettyClient.sendMsg(host,rpcRequest); + + + // nettyClient.sendMsg(host,rpcRequest); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java index bf1e6623e3..0dfc08ad2c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java @@ -6,7 +6,7 @@ package org.apache.dolphinscheduler.remote.rpc; */ public class UserService implements IUserService{ @Override - public String say() { - return null; + public String say(String s) { + return "krris"+s; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java index e34db2dd9c..e8821f7ab9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -4,9 +4,12 @@ import net.bytebuddy.implementation.bind.annotation.AllArguments; import net.bytebuddy.implementation.bind.annotation.Origin; import net.bytebuddy.implementation.bind.annotation.RuntimeType; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain; +import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; +import org.apache.dolphinscheduler.remote.utils.Host; import java.lang.reflect.Method; import java.util.UUID; @@ -32,6 +35,9 @@ public class ConsumerInterceptor { RpcRequest request = buildReq(args, method); //todo System.out.println(invoker.invoke(request)); + NettyClient nettyClient = new NettyClient(new NettyClientConfig()); + Host host = new Host("127.0.0.1", 12366); + nettyClient.sendMsg(host, request); return null; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java index 83b64fcc31..b9efdf8f9f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java @@ -4,6 +4,9 @@ import net.bytebuddy.ByteBuddy; import net.bytebuddy.implementation.MethodDelegation; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; + import java.util.concurrent.ConcurrentHashMap; /** @@ -15,6 +18,8 @@ public class RpcClient implements IRpcClient{ private ConcurrentHashMap classMap=new ConcurrentHashMap<>(); + + @Override public T create(Class clazz) throws Exception { if(!classMap.containsKey(clazz.getName())){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java index 9ee9ff2197..f5e68a38bd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -5,6 +5,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +40,28 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { logger.info("server read msg"); System.out.println("收到消息"); RpcRequest req= (RpcRequest) msg; System.out.println(req.getRequestId()); + RpcResponse response=new RpcResponse(); + response.setMsg("llll"); + response.setRequestId(req.getRequestId()); + + Class handlerClass = req.getClass(); + System.out.println(req.getMethodName()); + System.out.println(req.getClassName()); + String methodName = req.getMethodName(); + Class[] parameterTypes = req.getParameterTypes(); + Object[] parameters = req.getParameters(); + + // JDK reflect + Method method = handlerClass.getMethod(methodName, parameterTypes); + method.setAccessible(true); + Object result = method.invoke(req.getClassName(), parameters); + response.setResult(result); + ctx.writeAndFlush(response); } @Override