diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java index b0fd3893ed..7bac0361f4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java @@ -56,7 +56,7 @@ public class NettyServerConfig { /** * listen port */ - private int listenPort = 12336; + private int listenPort = 12636; public int getListenPort() { return listenPort; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java index 87bf405920..66ef806f17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java @@ -9,5 +9,7 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc; public interface IUserService { @Rpc(async = true,callback = UserCallback.class) - String say(String sb); + Boolean say(String sb); + + String hi(int num); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index df5f8d0257..f295917ca4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -18,13 +18,21 @@ public class MainTest { // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); - Host host = new Host("127.0.0.1", 12366); + Host host = new Host("127.0.0.1", 12636); IRpcClient rpcClient = new RpcClient(); - UserService userService = rpcClient.create(UserService.class); - String result = userService.say("calvin"); + IUserService userService = rpcClient.create(UserService.class,host); + boolean result = userService.say("calvin"); System.out.println( "异步回掉成功"+result); + System.out.println(userService.hi(10)); + System.out.println(userService.hi(188888888)); + + UserService user = rpcClient.create(UserService.class,host); + System.out.println(user.hi(99999)); + System.out.println(user.hi(998888888)); + // UserCallback.class.newInstance().run("lllll"); + // nettyClient.sendMsg(host,rpcRequest); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java index 4bd60996cb..fe712dfbe3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserCallback.java @@ -9,7 +9,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; public class UserCallback extends AbstractRpcCallBack { @Override public void run(Object object) { - String msg= (String) object; - System.out.println("我是异步回调"+msg); + Boolean msg= (Boolean) object; + System.out.println("我是异步回调handle Kris"+msg); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java index 8485d10f56..6614e401e3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java @@ -6,10 +6,15 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc; * @author jiangli * @date 2021-01-11 21:05 */ -public class UserService implements IUserService{ +public class UserService implements IUserService { @Override - @Rpc(async = true,callback = UserCallback.class,retries = 9999,isOneway = false) - public String say(String s) { - return "krris"+s; + @Rpc(async = true, callback = UserCallback.class, retries = 9999, isOneway = false) + public Boolean say(String s) { + return true; + } + + @Override + public String hi(int num) { + return "this world has " + num + "sun"; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java index 898a1c0827..4cd99a153b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java @@ -7,13 +7,13 @@ import java.util.concurrent.ConcurrentHashMap; */ public class ConsumerConfigCache { - private static ConcurrentHashMap consumerMap=new ConcurrentHashMap<>(); + private static ConcurrentHashMap consumerMap = new ConcurrentHashMap<>(); - public static ConsumerConfig getConfigByServersName(String serviceName){ + public static ConsumerConfig getConfigByServersName(String serviceName) { return consumerMap.get(serviceName); } - public static void putConfig(String serviceName,ConsumerConfig consumerConfig){ - consumerMap.putIfAbsent(serviceName,consumerConfig); + public static void putConfig(String serviceName, ConsumerConfig consumerConfig) { + consumerMap.putIfAbsent(serviceName, consumerConfig); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java index ccc0fb358d..f98b67c504 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -1,20 +1,20 @@ package org.apache.dolphinscheduler.remote.rpc.client; -import net.bytebuddy.implementation.bind.annotation.AllArguments; -import net.bytebuddy.implementation.bind.annotation.Origin; -import net.bytebuddy.implementation.bind.annotation.RuntimeType; - -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.base.Rpc; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain; import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import org.apache.dolphinscheduler.remote.utils.Host; import java.lang.reflect.Method; import java.util.UUID; -import java.util.function.Consumer; + +import net.bytebuddy.implementation.bind.annotation.AllArguments; +import net.bytebuddy.implementation.bind.annotation.Origin; +import net.bytebuddy.implementation.bind.annotation.RuntimeType; /** * ConsumerInterceptor @@ -26,30 +26,41 @@ public class ConsumerInterceptor { private FilterChain filterChain; - private NettyClient nettyClient=new NettyClient(new NettyClientConfig()); + private Host host; + + private NettyClient nettyClient = NettyClient.getInstance(); - public ConsumerInterceptor(Invoker invoker) { + public ConsumerInterceptor(Invoker invoker, Host host) { this.filterChain = new FilterChain(invoker); this.invoker = this.filterChain.buildFilterChain(); + this.host = host; } @RuntimeType - public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable { + public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException { RpcRequest request = buildReq(args, method); - String serviceName = method.getDeclaringClass().getName() + method; + String serviceName = method.getDeclaringClass().getName() + method.getName(); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); if (null == consumerConfig) { consumerConfig = cacheServiceConfig(method, serviceName); } boolean async = consumerConfig.getAsync(); - //load balance - Host host = new Host("127.0.0.1", 12336); + int retries = consumerConfig.getRetries(); + + while (retries-- > 0) { + RpcResponse rsp = (RpcResponse) nettyClient.sendMsg(host, request, async); + //success + if (null != rsp && rsp.getStatus() == 0) { + return rsp.getResult(); + } + } + // execute fail + throw new RemotingException("send msg error"); - return nettyClient.sendMsg(host, request, async); } private RpcRequest buildReq(Object[] args, Method method) { @@ -61,7 +72,7 @@ public class ConsumerInterceptor { request.setParameters(args); - String serviceName = method.getDeclaringClass().getName() + method; + String serviceName = method.getDeclaringClass().getName(); return request; } @@ -77,6 +88,7 @@ public class ConsumerInterceptor { consumerConfig.setRetries(rpc.retries()); consumerConfig.setOneway(rpc.isOneway()); } + ConsumerConfigCache.putConfig(serviceName, consumerConfig); return consumerConfig; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java index e0e538c044..abdc5d7289 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java @@ -1,11 +1,13 @@ package org.apache.dolphinscheduler.remote.rpc.client; +import org.apache.dolphinscheduler.remote.utils.Host; + /** * IRpcClient */ public interface IRpcClient { - T create(Class clazz) throws Exception; + T create(Class clazz, Host host) throws Exception; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java index 5fa8f45f51..1009750cb9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java @@ -1,15 +1,14 @@ package org.apache.dolphinscheduler.remote.rpc.client; -import net.bytebuddy.ByteBuddy; -import net.bytebuddy.implementation.MethodDelegation; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.rpc.base.Rpc; -import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; +import org.apache.dolphinscheduler.remote.utils.Host; import java.util.concurrent.ConcurrentHashMap; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.implementation.MethodDelegation; + /** * RpcClient */ @@ -18,18 +17,19 @@ public class RpcClient implements IRpcClient{ private ConcurrentHashMap classMap=new ConcurrentHashMap<>(); @Override - public T create(Class clazz) throws Exception { - if(!classMap.containsKey(clazz.getName())){ + public T create(Class clazz,Host host) throws Exception { + // if(!classMap.containsKey(clazz.getName())){ T proxy = new ByteBuddy() .subclass(clazz) - .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker()))) + .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker(),host))) .make() .load(getClass().getClassLoader()) .getLoaded() .getDeclaredConstructor().newInstance(); - classMap.putIfAbsent(clazz.getName(),proxy); - } - return (T) classMap.get(clazz.getName()); + // classMap.putIfAbsent(clazz.getName(),proxy); + return proxy; + // } + // return (T) classMap.get(clazz.getName()); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java index 845f832526..344a6e4a9b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java @@ -1,7 +1,5 @@ package org.apache.dolphinscheduler.remote.rpc.client; -import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; - import java.util.concurrent.ConcurrentHashMap; /** diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java index 9a72801365..2e2b1a734e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java @@ -33,15 +33,15 @@ public class RpcFuture implements Future { } @Override - public Object get() throws InterruptedException, ExecutionException { + public RpcResponse get() throws InterruptedException, ExecutionException { boolean b = latch.await(5,TimeUnit.SECONDS); - return response.getResult(); + return response; } @Override - public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { boolean b = latch.await(timeout,unit); - return response.getResult(); + return response; } public void done(RpcResponse response){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java index c8a2317878..a5f13a8343 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java @@ -1,31 +1,13 @@ package org.apache.dolphinscheduler.remote.rpc.remote; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.CharsetUtil; - -import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; - +import org.apache.dolphinscheduler.remote.decoder.NettyDecoder; import org.apache.dolphinscheduler.remote.decoder.NettyEncoder; -import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; -import org.apache.dolphinscheduler.remote.serialize.ProtoStuffUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NettyUtils; @@ -41,11 +23,33 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; + /** * NettyClient */ public class NettyClient { + public static NettyClient getInstance() { + return NettyClient.NettyClientInner.INSTANCE; + } + + private static class NettyClientInner { + + private static final NettyClient INSTANCE = new NettyClient(new NettyClientConfig()); + } + private final Logger logger = LoggerFactory.getLogger(NettyClient.class); /** @@ -160,38 +164,43 @@ public class NettyClient { @Override public void initChannel(SocketChannel ch) { ch.pipeline() - .addLast(new NettyEncoder(RpcRequest.class)) //OUT - 1 + .addLast(new NettyEncoder(RpcRequest.class)) .addLast(new NettyDecoder(RpcResponse.class)) .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) - .addLast(new NettyClientHandler()); } }); isStarted.compareAndSet(false, true); - System.out.println("netty client start"); } - public Object sendMsg(Host host, RpcRequest request, Boolean async) { + public RpcResponse sendMsg(Host host, RpcRequest request, Boolean async) { - System.out.println("这个不是异步"+async); Channel channel = getChannel(host); assert channel != null; RpcRequestCache rpcRequestCache = new RpcRequestCache(); rpcRequestCache.setServiceName(request.getClassName() + request.getMethodName()); - RpcFuture future = new RpcFuture(); - rpcRequestCache.setRpcFuture(future); + + + RpcFuture future = null; + if (!async) { + future = new RpcFuture(); + rpcRequestCache.setRpcFuture(future); + } RpcRequestTable.put(request.getRequestId(), rpcRequestCache); channel.writeAndFlush(request); - Object result = null; + RpcResponse result = null; if (async) { - return true; + result=new RpcResponse(); + result.setStatus((byte)0); + result.setResult(true); + return result; } try { result = future.get(); } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); + logger.error("send msg error",e); } return result; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index 5e965e6063..691c725577 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -3,9 +3,11 @@ package org.apache.dolphinscheduler.remote.rpc.remote; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.FastThreadLocalThread; +import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfig; +import org.apache.dolphinscheduler.remote.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; @@ -36,11 +38,36 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { RpcResponse rsp = (RpcResponse) msg; RpcRequestCache rpcRequest = RpcRequestTable.get(rsp.getRequestId()); - if (null != rpcRequest) { + + if (null == rpcRequest) { + logger.warn("未知响应"); + return; + } + + String serviceName = rpcRequest.getServiceName(); + ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); + if (!consumerConfig.getAsync()) { RpcFuture future = rpcRequest.getRpcFuture(); RpcRequestTable.remove(rsp.getRequestId()); future.done(rsp); + return; + } + + //async + new FastThreadLocalThread(() -> { + try { + if (rsp.getStatus() == 0) { + consumerConfig.getCallBackClass().newInstance().run(rsp.getResult()); + } else { + logger.error("xxxx fail"); + } + } catch (InstantiationException | IllegalAccessException e) { + logger.error("execute async error", e); + } + }).start(); + + } @Override @@ -51,7 +78,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { RpcRequest request = new RpcRequest(); request.setMethodName("heart"); ctx.channel().writeAndFlush(request); - logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息..."); + logger.debug("send heart beat msg..."); } else { super.userEventTriggered(ctx, evt); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java index 47bc3e70aa..9cd7f2a4d6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -1,18 +1,17 @@ package org.apache.dolphinscheduler.remote.rpc.remote; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.timeout.IdleStateEvent; - import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + /** * NettyServerHandler */ @@ -39,17 +38,19 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException, InstantiationException { + public void channelRead(ChannelHandlerContext ctx, Object msg) { RpcRequest req = (RpcRequest) msg; RpcResponse response = new RpcResponse(); - if(req.getMethodName().equals("heart")){ + if (req.getMethodName().equals("heart")) { logger.info("接受心跳消息!..."); return; } response.setRequestId(req.getRequestId()); + response.setStatus((byte) 0); + String classname = req.getClassName(); @@ -58,14 +59,20 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { Class[] parameterTypes = req.getParameterTypes(); Object[] arguments = req.getParameters(); + Object result = null; + try { - Class serviceClass = Class.forName(classname); + Class serviceClass = Class.forName(classname); - Object object = serviceClass.newInstance(); + Object object = serviceClass.newInstance(); - Method method = serviceClass.getMethod(methodName, parameterTypes); + Method method = serviceClass.getMethod(methodName, parameterTypes); - Object result = method.invoke(object, arguments); + result = method.invoke(object, arguments); + } catch (Exception e) { + logger.error("netty server execute error",e); + response.setStatus((byte)-1); + } response.setResult(result); ctx.writeAndFlush(response); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java index b48017ebf2..0a8c1b3666 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java @@ -8,8 +8,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** - * @author jiangli - * @date 2021-01-11 12:00 + * RandomSelector */ public class RandomSelector extends AbstractSelector {