From 939ee1f2f5e88316c48333fb880871fe95ea7c8a Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 18 Jan 2021 09:20:25 +0800 Subject: [PATCH] rpc --- .../dolphinscheduler/remote/rpc/MainTest.java | 13 +++++-------- .../remote/rpc/common/RpcRequest.java | 10 ++++++++++ .../remote/rpc/remote/NettyClientHandler.java | 2 +- .../remote/rpc/remote/NettyServer.java | 6 +++--- .../remote/rpc/remote/NettyServerHandler.java | 7 ++++--- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index f295917ca4..2296782273 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -1,7 +1,6 @@ package org.apache.dolphinscheduler.remote.rpc; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; - import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; @@ -14,6 +13,8 @@ import org.apache.dolphinscheduler.remote.utils.Host; public class MainTest { public static void main(String[] args) throws Exception { + + NettyServer nettyServer = new NettyServer(new NettyServerConfig()); // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); @@ -21,19 +22,15 @@ public class MainTest { Host host = new Host("127.0.0.1", 12636); IRpcClient rpcClient = new RpcClient(); - IUserService userService = rpcClient.create(UserService.class,host); + IUserService userService = rpcClient.create(UserService.class, host); boolean result = userService.say("calvin"); - System.out.println( "异步回掉成功"+result); + System.out.println("异步回掉成功" + result); System.out.println(userService.hi(10)); System.out.println(userService.hi(188888888)); - UserService user = rpcClient.create(UserService.class,host); + UserService user = rpcClient.create(UserService.class, host); System.out.println(user.hi(99999)); System.out.println(user.hi(998888888)); - // UserCallback.class.newInstance().run("lllll"); - - // nettyClient.sendMsg(host,rpcRequest); - } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java index 833ad0c73b..2cb4b23b7f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java @@ -10,6 +10,16 @@ public class RpcRequest { private String methodName; private Class[] parameterTypes; private Object[] parameters; + // 0 hear beat,1 businness msg + private Byte eventType=1; + + public Byte getEventType() { + return eventType; + } + + public void setEventType(Byte eventType) { + this.eventType = eventType; + } public String getRequestId() { return requestId; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index 691c725577..d6c9387b5b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -76,7 +76,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; RpcRequest request = new RpcRequest(); - request.setMethodName("heart"); + request.setEventType((byte)0); ctx.channel().writeAndFlush(request); logger.debug("send heart beat msg..."); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java index ed88345945..091a697ff3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java @@ -128,7 +128,7 @@ public class NettyServer { .childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch){ initNettyChannel(ch); } }); @@ -137,11 +137,11 @@ public class NettyServer { try { future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); } catch (Exception e) { - //logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); + logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); } if (future.isSuccess()) { - // logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); + logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); } else if (future.cause() != null) { throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); } else { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java index 9cd7f2a4d6..0b75d37b66 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -43,7 +43,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { RpcRequest req = (RpcRequest) msg; RpcResponse response = new RpcResponse(); - if (req.getMethodName().equals("heart")) { + if (req.getEventType() == 0) { + logger.info("接受心跳消息!..."); return; } @@ -70,8 +71,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { result = method.invoke(object, arguments); } catch (Exception e) { - logger.error("netty server execute error",e); - response.setStatus((byte)-1); + logger.error("netty server execute error", e); + response.setStatus((byte) -1); } response.setResult(result);