Browse Source

rpc

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
939ee1f2f5
  1. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
  2. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
  4. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java
  5. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@ -1,7 +1,6 @@
package org.apache.dolphinscheduler.remote.rpc; package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; import org.apache.dolphinscheduler.remote.rpc.client.RpcClient;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer; import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer;
@ -14,6 +13,8 @@ import org.apache.dolphinscheduler.remote.utils.Host;
public class MainTest { public class MainTest {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
NettyServer nettyServer = new NettyServer(new NettyServerConfig()); NettyServer nettyServer = new NettyServer(new NettyServerConfig());
// NettyClient nettyClient=new NettyClient(new NettyClientConfig()); // NettyClient nettyClient=new NettyClient(new NettyClientConfig());
@ -21,19 +22,15 @@ public class MainTest {
Host host = new Host("127.0.0.1", 12636); Host host = new Host("127.0.0.1", 12636);
IRpcClient rpcClient = new RpcClient(); IRpcClient rpcClient = new RpcClient();
IUserService userService = rpcClient.create(UserService.class,host); IUserService userService = rpcClient.create(UserService.class, host);
boolean result = userService.say("calvin"); boolean result = userService.say("calvin");
System.out.println( "异步回掉成功"+result); System.out.println("异步回掉成功" + result);
System.out.println(userService.hi(10)); System.out.println(userService.hi(10));
System.out.println(userService.hi(188888888)); System.out.println(userService.hi(188888888));
UserService user = rpcClient.create(UserService.class,host); UserService user = rpcClient.create(UserService.class, host);
System.out.println(user.hi(99999)); System.out.println(user.hi(99999));
System.out.println(user.hi(998888888)); System.out.println(user.hi(998888888));
// UserCallback.class.newInstance().run("lllll");
// nettyClient.sendMsg(host,rpcRequest);
} }
} }

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java

@ -10,6 +10,16 @@ public class RpcRequest {
private String methodName; private String methodName;
private Class<?>[] parameterTypes; private Class<?>[] parameterTypes;
private Object[] parameters; private Object[] parameters;
// 0 hear beat,1 businness msg
private Byte eventType=1;
public Byte getEventType() {
return eventType;
}
public void setEventType(Byte eventType) {
this.eventType = eventType;
}
public String getRequestId() { public String getRequestId() {
return requestId; return requestId;

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@ -76,7 +76,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
RpcRequest request = new RpcRequest(); RpcRequest request = new RpcRequest();
request.setMethodName("heart"); request.setEventType((byte)0);
ctx.channel().writeAndFlush(request); ctx.channel().writeAndFlush(request);
logger.debug("send heart beat msg..."); logger.debug("send heart beat msg...");

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java

@ -128,7 +128,7 @@ public class NettyServer {
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch){
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
@ -137,11 +137,11 @@ public class NettyServer {
try { try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) { } catch (Exception e) {
//logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
} }
if (future.isSuccess()) { if (future.isSuccess()) {
// logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) { } else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
} else { } else {

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@ -43,7 +43,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcRequest req = (RpcRequest) msg; RpcRequest req = (RpcRequest) msg;
RpcResponse response = new RpcResponse(); RpcResponse response = new RpcResponse();
if (req.getMethodName().equals("heart")) { if (req.getEventType() == 0) {
logger.info("接受心跳消息!..."); logger.info("接受心跳消息!...");
return; return;
} }
@ -70,8 +71,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
result = method.invoke(object, arguments); result = method.invoke(object, arguments);
} catch (Exception e) { } catch (Exception e) {
logger.error("netty server execute error",e); logger.error("netty server execute error", e);
response.setStatus((byte)-1); response.setStatus((byte) -1);
} }
response.setResult(result); response.setResult(result);

Loading…
Cancel
Save