Browse Source

rpc

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
e950b65945
  1. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java
  2. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java
  3. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java
  4. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
  5. 15
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java
  6. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
  7. 25
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java
  8. 61
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java
  9. 19
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java
  10. 40
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
  11. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java
  12. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java
  13. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java
  14. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java
  15. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java
  16. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java
  17. 14
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java
  18. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
  19. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java
  20. 22
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java
  21. 47
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
  22. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java
  23. 54
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
  24. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyDecoder.java

@ -9,21 +9,19 @@ import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
import java.util.List;
/**
* @author jiangli
* @date 2021-01-12 18:53
* NettyDecoder
*/
public class NettyDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
// 构造函数传入向反序列化的class
public NettyDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
System.out.println("decoder");
if (byteBuf.readableBytes() < 4) {
return;
}

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/decoder/NettyEncoder.java

@ -11,24 +11,22 @@ import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils;
* @author jiangli
* @date 2021-01-12 18:52
*/
public class NettyEncoder extends MessageToByteEncoder {
public class NettyEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
// 构造函数传入向反序列化的class
public NettyEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
System.out.println("encsss");
if (genericClass.isInstance(o)) {
byte[] data = ProtoStuffUtils.serialize(o);
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java

@ -1,10 +1,13 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
/**
* @author jiangli
* @date 2021-01-11 21:05
*/
public interface IUserService {
@Rpc(async = true,callback = UserCallback.class)
String say(String sb);
}

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@ -1,12 +1,9 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.remote.rpc.client.RpcClient;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyServer;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -26,8 +23,7 @@ public class MainTest {
IRpcClient rpcClient = new RpcClient();
UserService userService = rpcClient.create(UserService.class);
String result = userService.say("calvin");
System.out.println("我是你爸爸吧"+result);
System.out.println( "异步回掉成功"+result);
// nettyClient.sendMsg(host,rpcRequest);

15
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java

@ -0,0 +1,15 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
/**
* @author jiangli
* @date 2021-01-15 07:32
*/
public class UserCallback extends AbstractRpcCallBack {
@Override
public void run(Object object) {
String msg= (String) object;
System.out.println("我是异步回调"+msg);
}
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java

@ -1,11 +1,14 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
/**
* @author jiangli
* @date 2021-01-11 21:05
*/
public class UserService implements IUserService{
@Override
@Rpc(async = true,callback = UserCallback.class,retries = 9999,isOneway = false)
public String say(String s) {
return "krris"+s;
}

25
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java

@ -0,0 +1,25 @@
package org.apache.dolphinscheduler.remote.rpc.base;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Rpc
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rpc {
int retries() default 3;
boolean async() default false;
boolean isOneway() default true;
Class<? extends AbstractRpcCallBack> callback() default AbstractRpcCallBack.class;
}

61
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java

@ -0,0 +1,61 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants;
/**
* ConsumerConfig
*/
public class ConsumerConfig {
private Class<? extends AbstractRpcCallBack> callBackClass;
private String serviceName;
private Boolean async = ConsumerConfigConstants.DEFAULT_SYNC;
private Boolean isOneway = ConsumerConfigConstants.DEFAULT_IS_ONEWAY;
private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES;
public Class<? extends AbstractRpcCallBack> getCallBackClass() {
return callBackClass;
}
public void setCallBackClass(Class<? extends AbstractRpcCallBack> callBackClass) {
this.callBackClass = callBackClass;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public Boolean getAsync() {
return async;
}
public void setAsync(Boolean async) {
this.async = async;
}
public Boolean getOneway() {
return isOneway;
}
public void setOneway(Boolean oneway) {
isOneway = oneway;
}
public Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
}

19
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java

@ -0,0 +1,19 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import java.util.concurrent.ConcurrentHashMap;
/**
* ConsumerConfigCache
*/
public class ConsumerConfigCache {
private static ConcurrentHashMap<String, ConsumerConfig> consumerMap=new ConcurrentHashMap<>();
public static ConsumerConfig getConfigByServersName(String serviceName){
return consumerMap.get(serviceName);
}
public static void putConfig(String serviceName,ConsumerConfig consumerConfig){
consumerMap.putIfAbsent(serviceName,consumerConfig);
}
}

40
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java

@ -6,6 +6,7 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
@ -13,6 +14,7 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.function.Consumer;
/**
* ConsumerInterceptor
@ -24,6 +26,8 @@ public class ConsumerInterceptor {
private FilterChain filterChain;
private NettyClient nettyClient=new NettyClient(new NettyClientConfig());
public ConsumerInterceptor(Invoker invoker) {
this.filterChain = new FilterChain(invoker);
this.invoker = this.filterChain.buildFilterChain();
@ -33,13 +37,19 @@ public class ConsumerInterceptor {
@RuntimeType
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable {
RpcRequest request = buildReq(args, method);
//todo
System.out.println(invoker.invoke(request));
NettyClient nettyClient = new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12336);
return nettyClient.sendMsg(host, request);
String serviceName = method.getDeclaringClass().getName() + method;
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (null == consumerConfig) {
consumerConfig = cacheServiceConfig(method, serviceName);
}
boolean async = consumerConfig.getAsync();
//load balance
Host host = new Host("127.0.0.1", 12336);
return nettyClient.sendMsg(host, request, async);
}
private RpcRequest buildReq(Object[] args, Method method) {
@ -48,8 +58,28 @@ public class ConsumerInterceptor {
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
String serviceName = method.getDeclaringClass().getName() + method;
return request;
}
private ConsumerConfig cacheServiceConfig(Method method, String serviceName) {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setServiceName(serviceName);
boolean annotationPresent = method.isAnnotationPresent(Rpc.class);
if (annotationPresent) {
Rpc rpc = method.getAnnotation(Rpc.class);
consumerConfig.setAsync(rpc.async());
consumerConfig.setCallBackClass(rpc.callback());
consumerConfig.setRetries(rpc.retries());
consumerConfig.setOneway(rpc.isOneway());
}
ConsumerConfigCache.putConfig(serviceName, consumerConfig);
return consumerConfig;
}
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java

@ -5,8 +5,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
/**
* @author jiangli
* @date 2021-01-09 15:27
* ConsumerInvoker
*/
public class ConsumerInvoker implements Invoker {
@Override

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java

@ -1,8 +1,7 @@
package org.apache.dolphinscheduler.remote.rpc.client;
/**
* @author jiangli
* @date 2021-01-09 10:58
* IRpcClient
*/
public interface IRpcClient {

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java

@ -5,21 +5,18 @@ import net.bytebuddy.implementation.MethodDelegation;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jiangli
* @date 2021-01-09 10:59
* RpcClient
*/
public class RpcClient implements IRpcClient{
private ConcurrentHashMap<String,Object> classMap=new ConcurrentHashMap<>();
@Override
public <T> T create(Class<T> clazz) throws Exception {
if(!classMap.containsKey(clazz.getName())){

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java

@ -0,0 +1,29 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
/**
* RpcRequestCache
*/
public class RpcRequestCache {
private RpcFuture rpcFuture;
private String serviceName;
public RpcFuture getRpcFuture() {
return rpcFuture;
}
public void setRpcFuture(RpcFuture rpcFuture) {
this.rpcFuture = rpcFuture;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java

@ -5,24 +5,23 @@ import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jiangli
* @date 2021-01-14 10:42
* RpcRequestTable
*/
public class RpcRequestTable {
// key: requestId value: RpcFuture
private static ConcurrentHashMap<String, RpcFuture> processingRpc = new ConcurrentHashMap<>();
public static void put(String requestId,RpcFuture rpcFuture){
processingRpc.put(requestId,rpcFuture);
private static ConcurrentHashMap<String, RpcRequestCache> requestMap = new ConcurrentHashMap<>();
public static void put(String requestId,RpcRequestCache rpcRequestCache){
requestMap.put(requestId,rpcRequestCache);
}
public static RpcFuture get(String requestId){
return processingRpc.get(requestId);
public static RpcRequestCache get(String requestId){
return requestMap.get(requestId);
}
public static void remove(String requestId){
processingRpc.remove(requestId);
requestMap.remove(requestId);
}
}

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java

@ -0,0 +1,10 @@
package org.apache.dolphinscheduler.remote.rpc.common;
/**
* AbstractRpcCallBack
*/
public abstract class AbstractRpcCallBack {
public abstract void run(Object object);
}

14
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java

@ -0,0 +1,14 @@
package org.apache.dolphinscheduler.remote.rpc.common;
/**
* ConsumerConfigConstants
*/
public class ConsumerConfigConstants {
public static final Boolean DEFAULT_SYNC = false;
public static final Boolean DEFAULT_IS_ONEWAY = false;
public static final Integer DEFAULT_RETRIES = 3;
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java

@ -1,8 +1,7 @@
package org.apache.dolphinscheduler.remote.rpc.common;
/**
* @author jiangli
* @date 2021-01-09 13:21
* RpcRequest
*/
public class RpcRequest {

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java

@ -9,8 +9,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author jiangli
* @date 2021-01-14 09:24
* RpcFuture
*/
public class RpcFuture implements Future<Object> {

22
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java

@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.decoder.NettyEncoder;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
@ -41,8 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author jiangli
* @date 2021-01-13 19:31
* NettyClient
*/
public class NettyClient {
@ -172,24 +172,30 @@ public class NettyClient {
System.out.println("netty client start");
}
public Object sendMsg(Host host, RpcRequest request) {
public Object sendMsg(Host host, RpcRequest request, Boolean async) {
System.out.println("这个不是异步"+async);
Channel channel = getChannel(host);
assert channel != null;
RpcRequestCache rpcRequestCache = new RpcRequestCache();
rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName());
RpcFuture future = new RpcFuture();
RpcRequestTable.put(request.getRequestId(), future);
rpcRequestCache.setRpcFuture(future);
RpcRequestTable.put(request.getRequestId(), rpcRequestCache);
channel.writeAndFlush(request);
Object result = null;
if (async) {
return true;
}
try {
result=future.get();
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return result;
}
/**
* close
*/

47
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@ -1,16 +1,12 @@
package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
@ -22,8 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author jiangli
* @date 2021-01-13 13:33
* NettyClientHandler
*/
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@ -31,45 +26,35 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) {
// ctx.channel().close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("client 关闭channel");
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
ctx.channel().close();
//todo connectManage.removeChannel(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息");
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RpcResponse rsp = (RpcResponse) msg;
RpcFuture rpcFuture= RpcRequestTable.get(rsp.getRequestId());
if(null!=rpcFuture){
RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId());
if (null != rpcRequest) {
RpcFuture future = rpcRequest.getRpcFuture();
RpcRequestTable.remove(rsp.getRequestId());
rpcFuture.done(rsp);
future.done(rsp);
}
System.out.println(rsp.getResult().toString());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
RpcRequest request = new RpcRequest();
request.setMethodName("heartBeat");
ctx.channel().writeAndFlush(request);
logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
}
}else{
super.userEventTriggered(ctx,evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
RpcRequest request = new RpcRequest();
request.setMethodName("heart");
ctx.channel().writeAndFlush(request);
logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
} else {
super.userEventTriggered(ctx, evt);
}
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java

@ -31,8 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author jiangli
* @date 2021-01-13 19:32
* NettyServer
*/
public class NettyServer {

54
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@ -14,12 +14,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author jiangli
* @date 2021-01-13 19:20
* NettyServerHandler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
@ -27,47 +27,45 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx){
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("channel close");
ctx.channel().close();
ctx.channel().close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接成功!"+ctx.channel().remoteAddress());
logger.info("客户端连接成功!"+ctx.channel().remoteAddress());
public void channelActive(ChannelHandlerContext ctx) {
logger.info("client connect success !" + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, InstantiationException {
logger.info("server read msg");
System.out.println("收到消息");
RpcRequest req= (RpcRequest) msg;
System.out.println(req.getRequestId());
RpcResponse response=new RpcResponse();
response.setMsg("llll");
RpcRequest req = (RpcRequest) msg;
RpcResponse response = new RpcResponse();
if(req.getMethodName().equals("heart")){
logger.info("接受心跳消息!...");
return;
}
response.setRequestId(req.getRequestId());
String classname=req.getClassName();
//获得服务端要调用的方法名称
String methodName=req.getMethodName();
//获得服务端要调用方法的参数类型
Class<?>[] parameterTypes=req.getParameterTypes();
//获得服务端要调用方法的每一个参数的值
Object[] arguments=req.getParameters();
String classname = req.getClassName();
String methodName = req.getMethodName();
Class<?>[] parameterTypes = req.getParameterTypes();
Object[] arguments = req.getParameters();
Class serviceClass = Class.forName(classname);
//创建类
Class serviceClass=Class.forName(classname);
//创建对象
Object object = serviceClass.newInstance();
//获得该类的对应的方法
Method method=serviceClass.getMethod(methodName, parameterTypes);
//该对象调用指定方法
Object result=method.invoke(object, arguments);
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(object, arguments);
response.setResult(result);
ctx.writeAndFlush(response);
@ -83,7 +81,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("exceptionCaught");
logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close();

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

@ -9,8 +9,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jiangli
* @date 2021-01-12 18:56
* ProtoStuffUtils
*/
public class ProtoStuffUtils {

Loading…
Cancel
Save