diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java index 8284171fc1..9014b82276 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java @@ -30,6 +30,10 @@ import io.protostuff.runtime.RuntimeSchema; */ public class ProtoStuffUtils { + private ProtoStuffUtils() { + throw new IllegalStateException("Utility class"); + } + private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); private static Map, Schema> schemaCache = new ConcurrentHashMap<>(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java index a407079fc5..4c8c9f4a07 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerConfigCache.java @@ -24,6 +24,10 @@ import java.util.concurrent.ConcurrentHashMap; */ public class ConsumerConfigCache { + private ConsumerConfigCache() { + throw new IllegalStateException("Utility class"); + } + private static ConcurrentHashMap consumerMap = new ConcurrentHashMap<>(); public static ConsumerConfig getConfigByServersName(String serviceName) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java index f976d0b1d6..47bb699c9f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/ConsumerInterceptor.java @@ -75,11 +75,8 @@ public class ConsumerInterceptor { request.setClassName(method.getDeclaringClass().getSimpleName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); - request.setParameters(args); - String serviceName = method.getDeclaringClass().getName(); - return request; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java index 5f5e32e486..0a62a15447 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/client/RpcRequestTable.java @@ -24,6 +24,10 @@ import java.util.concurrent.ConcurrentHashMap; */ public class RpcRequestTable { + private RpcRequestTable() { + throw new IllegalStateException("Utility class"); + } + private static ConcurrentHashMap requestMap = new ConcurrentHashMap<>(); public static void put(String requestId, RpcRequestCache rpcRequestCache) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java index 478c10432c..def8fe10d4 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ConsumerConfigConstants.java @@ -22,6 +22,10 @@ package org.apache.dolphinscheduler.rpc.common; */ public class ConsumerConfigConstants { + private ConsumerConfigConstants() { + throw new IllegalStateException("Utility class"); + } + public static final Boolean DEFAULT_SYNC = false; public static final Integer DEFAULT_RETRIES = 3; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java index cb4c33a23a..1ddd574e0d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/common/ThreadPoolManager.java @@ -29,13 +29,12 @@ public enum ThreadPoolManager { ExecutorService executorService; + private static final int WORK_QUEUE_SIZE = 200; + private static final long KEEP_ALIVE_TIME = 60; + ThreadPoolManager() { - int SIZE_WORK_QUEUE = 200; - long KEEP_ALIVE_TIME = 60; - int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; - int MAXI_MUM_POOL_SIZE = CORE_POOL_SIZE * 4; - executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(SIZE_WORK_QUEUE), + executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 4, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(WORK_QUEUE_SIZE), new DiscardPolicy()); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java index f1c9e01767..6369d16725 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/config/ServiceBean.java @@ -36,10 +36,14 @@ public class ServiceBean { private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class); - private static Map serviceMap = new HashMap<>(); + private static Map serviceMap = new HashMap<>(); private static AtomicBoolean initialized = new AtomicBoolean(false); + private ServiceBean() { + throw new IllegalStateException("Utility class"); + } + private static synchronized void init() { // todo config Reflections f = new Reflections("org/apache/dolphinscheduler/rpc"); @@ -47,16 +51,17 @@ public class ServiceBean { list.forEach(rpcClass -> { RpcService rpcService = rpcClass.getAnnotation(RpcService.class); serviceMap.put(rpcService.value(), rpcClass); + logger.info("load rpc service {}", rpcService.value()); }); } public static Class getServiceClass(String className) { if (initialized.get()) { - return (Class) serviceMap.get(className); + return serviceMap.get(className); } else { init(); } - return (Class) serviceMap.get(className); + return serviceMap.get(className); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index 28db8e5f32..caabe929f7 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -199,7 +199,7 @@ public class NettyClient { String serviceName = request.getClassName() + request.getMethodName(); rpcRequestCache.setServiceName(serviceName); RpcFuture future = null; - if (!async) { + if (Boolean.FALSE.equals(async)) { future = new RpcFuture(request); rpcRequestCache.setRpcFuture(future); } @@ -207,13 +207,14 @@ public class NettyClient { channel.writeAndFlush(request); RpcResponse result = null; - if (async) { + if (Boolean.TRUE.equals(async)) { result = new RpcResponse(); result.setStatus((byte) 0); result.setResult(true); return result; } try { + assert future != null; result = future.get(); } catch (InterruptedException | ExecutionException e) { logger.error("send msg error,service name is {}", serviceName, e); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java index ec4f97255a..d145e34395 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClientHandler.java @@ -21,13 +21,13 @@ import org.apache.dolphinscheduler.rpc.client.ConsumerConfig; import org.apache.dolphinscheduler.rpc.client.ConsumerConfigCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestCache; import org.apache.dolphinscheduler.rpc.client.RpcRequestTable; +import org.apache.dolphinscheduler.rpc.common.RequestEventType; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; import org.apache.dolphinscheduler.rpc.future.RpcFuture; import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.concurrent.FastThreadLocalThread; /** * NettyClientHandler @@ -47,11 +46,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); - private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; + private static final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; @Override public void channelInactive(ChannelHandlerContext ctx) { - InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); ctx.channel().close(); } @@ -70,7 +68,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private void readHandler(RpcResponse rsp, RpcRequestCache rpcRequest) { String serviceName = rpcRequest.getServiceName(); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); - if (!consumerConfig.getAsync()) { + if (Boolean.FALSE.equals(consumerConfig.getAsync())) { RpcFuture future = rpcRequest.getRpcFuture(); RpcRequestTable.remove(rsp.getRequestId()); future.done(rsp); @@ -78,31 +76,26 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } //async - new FastThreadLocalThread(() -> { + + if (rsp.getStatus() == 0) { + try { - if (rsp.getStatus() == 0) { - try { - consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult()); - } catch (InvocationTargetException | NoSuchMethodException e) { - logger.error("rpc call back error, serviceName {} ", serviceName, e); - } - - } else { - logger.error("rpc response error ,serviceName {}", serviceName); - } - } catch (InstantiationException | IllegalAccessException e) { - logger.error("execute async error,serviceName {}", serviceName, e); + consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult()); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + logger.error("rpc service call back error,serviceName {},rsp {}", serviceName, rsp); } - }).start(); + } else { + logger.error("rpc response error ,serviceName {},rsp {}", serviceName, rsp); + } + } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; RpcRequest request = new RpcRequest(); - request.setEventType((byte) 0); + request.setEventType(RequestEventType.HEARTBEAT.getType()); ctx.channel().writeAndFlush(request); logger.debug("send heart beat msg..."); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java index ae4ccaab6e..28586f856c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyServerHandler.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.rpc.remote; +import org.apache.dolphinscheduler.rpc.common.RequestEventType; import org.apache.dolphinscheduler.rpc.common.RpcRequest; import org.apache.dolphinscheduler.rpc.common.RpcResponse; import org.apache.dolphinscheduler.rpc.common.ThreadPoolManager; @@ -38,12 +39,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); - private final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - super.channelRegistered(ctx); - } + private static final ThreadPoolManager threadPoolManager = ThreadPoolManager.INSTANCE; @Override public void channelInactive(ChannelHandlerContext ctx) { @@ -61,7 +57,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { RpcRequest req = (RpcRequest) msg; - if (req.getEventType() == 0) { + if (req.getEventType().equals(RequestEventType.HEARTBEAT.getType())) { logger.info("accept heartbeat msg"); return; @@ -111,7 +107,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - System.out.println("exceptionCaught"); logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); }